IMPALA-5773: Correctly account for memory used in data stream receiver queue

DataStreamRecvrs keep one or more queues of batches received to provide
some buffering. Each queue has a fixed byte size capacity. The estimate
of the contribution of a new RowBatch to that queue was using the
compressed size of the TRowBatch it would be deserialized from, which is
the wrong value (since the batch is uncompressed after deserialization).

* Add RowBatch::Get[Des|S]erializedSize(const TRowBatch&) to RowBatch
* Fix the estimate to use the uncompressed size.
* Add a DataStreamReceiver child profile to the exchg node so that the
  peak memory used by the receiver can be monitored easily.

Confirmed that the following query:

select count(distinct concat(cast(l_comment as char(120)),
                             cast(l_comment as char(120)),
                             cast(l_comment as char(120)),
                             cast(l_comment as char(120)),
                             cast(l_comment as char(120)),
                             cast(l_comment as char(120))) from lineitem;

succeeds with a mem-limit of 800Mb. Before this patch it would fail in a
one-node cluster as the datastream recvr would buffer more batches than
the memory limit would allow.

Change-Id: I9e90f9596ee984438e3373af05e84d361702ca6a
Reviewed-on: http://gerrit.cloudera.org:8080/7646
Tested-by: Impala Public Jenkins
Reviewed-by: Henry Robinson <he...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f2f52a8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f2f52a8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f2f52a8e

Branch: refs/heads/master
Commit: f2f52a8e1ce9560329566ee71945b3901a1ef958
Parents: 200ce49
Author: Henry Robinson <he...@cloudera.com>
Authored: Tue Aug 8 12:21:05 2017 -0700
Committer: Henry Robinson <he...@cloudera.com>
Committed: Sat Aug 12 01:49:50 2017 +0000

----------------------------------------------------------------------
 .../benchmarks/row-batch-serialize-benchmark.cc |  4 +--
 be/src/exec/exchange-node.cc                    |  4 +--
 be/src/runtime/data-stream-mgr-base.h           |  4 +--
 be/src/runtime/data-stream-mgr.cc               |  6 ++---
 be/src/runtime/data-stream-mgr.h                |  4 +--
 be/src/runtime/data-stream-recvr.cc             | 27 ++++++++++----------
 be/src/runtime/data-stream-recvr.h              |  8 +++---
 be/src/runtime/data-stream-sender.cc            | 12 ++++-----
 be/src/runtime/krpc-data-stream-mgr.cc          | 11 ++++----
 be/src/runtime/krpc-data-stream-mgr.h           |  4 +--
 be/src/runtime/row-batch.cc                     | 11 ++++++--
 be/src/runtime/row-batch.h                      |  7 +++--
 12 files changed, 54 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc 
b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 350252d..ee531e6 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -140,9 +140,7 @@ class RowBatchSerializeBaseline {
       VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << 
compressed_size;
     }
 
-    // The size output_batch would be if we didn't compress tuple_data (will 
be equal to
-    // actual batch size if tuple_data isn't compressed)
-    return batch->GetBatchSize(*output_batch) - 
output_batch->tuple_data.size() + size;
+    return RowBatch::GetDeserializedSize(*output_batch);
   }
 
   // Copy of baseline version without dedup logic

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 9b5f548..a0c002a 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -38,8 +38,8 @@ DECLARE_int32(stress_datastream_recvr_delay_ms);
 
 using namespace impala;
 
-DEFINE_int32(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
-             "(Advanced) Maximum size of per-query receive-side buffer");
+DEFINE_int64(exchg_node_buffer_size_bytes, 1024 * 1024 * 10,
+    "(Advanced) Maximum size of per-query receive-side buffer");
 
 ExchangeNode::ExchangeNode(
     ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h 
b/be/src/runtime/data-stream-mgr-base.h
index 53798b1..7886cfa 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -44,8 +44,8 @@ class DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/node_id 
destination;
   virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
-      bool is_merging) = 0;
+      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+      RuntimeProfile* profile, bool is_merging) = 0;
 
   /// Notifies the recvr associated with the fragment/node id that the 
specified
   /// sender has closed.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc 
b/be/src/runtime/data-stream-mgr.cc
index 82bc01f..a161ad3 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -77,8 +77,8 @@ inline uint32_t DataStreamMgr::GetHashValue(
 
 shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
     const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* 
profile,
-    bool is_merging) {
+    PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+    RuntimeProfile* profile, bool is_merging) {
   DCHECK(profile != NULL);
   VLOG_FILE << "creating receiver for fragment="
             << fragment_instance_id << ", node=" << dest_node_id;
@@ -172,7 +172,7 @@ Status DataStreamMgr::AddData(const TUniqueId& 
fragment_instance_id,
     PlanNodeId dest_node_id, const TRowBatch& thrift_batch, int sender_id) {
   VLOG_ROW << "AddData(): fragment_instance_id=" << fragment_instance_id
            << " node=" << dest_node_id
-           << " size=" << RowBatch::GetBatchSize(thrift_batch);
+           << " size=" << RowBatch::GetDeserializedSize(thrift_batch);
   bool already_unregistered;
   shared_ptr<DataStreamRecvr> recvr = FindRecvrOrWait(fragment_instance_id, 
dest_node_id,
       &already_unregistered);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 5dae908..eff468e 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -77,8 +77,8 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// caller.
   std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
-      bool is_merging) override;
+      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+      RuntimeProfile* profile, bool is_merging) override;
 
   /// Adds a row batch to the recvr identified by 
fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender 
instance

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc 
b/be/src/runtime/data-stream-recvr.cc
index 3828a3e..0c6d98e 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -151,8 +151,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const 
TRowBatch& thrift_batch) {
   unique_lock<mutex> l(lock_);
   if (is_cancelled_) return;
 
-  int batch_size = RowBatch::GetBatchSize(thrift_batch);
-  COUNTER_ADD(recvr_->bytes_received_counter_, batch_size);
+  COUNTER_ADD(recvr_->bytes_received_counter_, 
RowBatch::GetSerializedSize(thrift_batch));
   DCHECK_GT(num_remaining_senders_, 0);
 
   // if there's something in the queue and this batch will push us over the
@@ -162,6 +161,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const 
TRowBatch& thrift_batch) {
   // received from a specific queue based on data order, and the pipeline will 
stall
   // if the merger is waiting for data from an empty queue that cannot be 
filled because
   // the limit has been reached.
+  int64_t batch_size = RowBatch::GetDeserializedSize(thrift_batch);
   while (!batch_queue_.empty() && recvr_->ExceedsLimit(batch_size) && 
!is_cancelled_) {
     CANCEL_SAFE_SCOPED_TIMER(recvr_->buffer_full_total_timer_, &is_cancelled_);
     VLOG_ROW << " wait removal: empty=" << (batch_queue_.empty() ? 1 : 0)
@@ -285,7 +285,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* 
transfer_batch) {
 
 DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* 
parent_tracker,
     const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
+    PlanNodeId dest_node_id, int num_senders, bool is_merging, int64_t 
total_buffer_limit,
     RuntimeProfile* profile)
   : mgr_(stream_mgr),
     fragment_instance_id_(fragment_instance_id),
@@ -295,7 +295,6 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, 
MemTracker* parent_t
     is_merging_(is_merging),
     num_buffered_bytes_(0),
     profile_(profile) {
-  mem_tracker_.reset(new MemTracker(-1, "DataStreamRecvr", parent_tracker));
   // Create one queue per sender if is_merging is true.
   int num_queues = is_merging ? num_senders : 1;
   sender_queues_.reserve(num_queues);
@@ -306,17 +305,19 @@ DataStreamRecvr::DataStreamRecvr(DataStreamMgr* 
stream_mgr, MemTracker* parent_t
     sender_queues_.push_back(queue);
   }
 
+  RuntimeProfile* child_profile = profile_->CreateChild("DataStreamReceiver");
+  mem_tracker_.reset(
+      new MemTracker(child_profile, -1, "DataStreamRecvr", parent_tracker));
+
   // Initialize the counters
-  bytes_received_counter_ =
-      ADD_COUNTER(profile_, "BytesReceived", TUnit::BYTES);
+  bytes_received_counter_ = ADD_COUNTER(child_profile, "BytesReceived", 
TUnit::BYTES);
   bytes_received_time_series_counter_ =
-      ADD_TIME_SERIES_COUNTER(profile_, "BytesReceived", 
bytes_received_counter_);
-  deserialize_row_batch_timer_ =
-      ADD_TIMER(profile_, "DeserializeRowBatchTimer");
-  buffer_full_wall_timer_ = ADD_TIMER(profile_, "SendersBlockedTimer");
-  buffer_full_total_timer_ = ADD_TIMER(profile_, 
"SendersBlockedTotalTimer(*)");
-  data_arrival_timer_ = profile_->inactive_timer();
-  first_batch_wait_total_timer_ = ADD_TIMER(profile_, 
"FirstBatchArrivalWaitTime");
+      ADD_TIME_SERIES_COUNTER(child_profile, "BytesReceived", 
bytes_received_counter_);
+  deserialize_row_batch_timer_ = ADD_TIMER(child_profile, 
"DeserializeRowBatchTimer");
+  buffer_full_wall_timer_ = ADD_TIMER(child_profile, "SendersBlockedTimer");
+  buffer_full_total_timer_ = ADD_TIMER(child_profile, 
"SendersBlockedTotalTimer(*)");
+  data_arrival_timer_ = child_profile->inactive_timer();
+  first_batch_wait_total_timer_ = ADD_TIMER(child_profile, 
"FirstBatchArrivalWaitTime");
 }
 
 Status DataStreamRecvr::GetNext(RowBatch* output_batch, bool* eos) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h 
b/be/src/runtime/data-stream-recvr.h
index 468f58a..fad588d 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -104,8 +104,8 @@ class DataStreamRecvr : public DataStreamRecvrBase {
 
   DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
-      RuntimeProfile* profile);
+      PlanNodeId dest_node_id, int num_senders, bool is_merging,
+      int64_t total_buffer_limit, RuntimeProfile* profile);
 
   /// Add a new batch of rows to the appropriate sender queue, blocking if the 
queue is
   /// full. Called from DataStreamMgr.
@@ -120,7 +120,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
 
   /// Return true if the addition of a new batch of size 'batch_size' would 
exceed the
   /// total buffer limit.
-  bool ExceedsLimit(int batch_size) {
+  bool ExceedsLimit(int64_t batch_size) {
     return num_buffered_bytes_.Load() + batch_size > total_buffer_limit_;
   }
 
@@ -144,7 +144,7 @@ class DataStreamRecvr : public DataStreamRecvrBase {
   bool is_merging_;
 
   /// total number of bytes held across all sender queues.
-  AtomicInt32 num_buffered_bytes_;
+  AtomicInt64 num_buffered_bytes_;
 
   /// Memtracker for batches in the sender queue(s).
   boost::scoped_ptr<MemTracker> mem_tracker_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index 16cf87e..752e3ce 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -218,7 +218,7 @@ void DataStreamSender::Channel::TransmitDataHelper(const 
TRowBatch* batch) {
   if (res.status.status_code != TErrorCode::OK) {
     rpc_status_ = res.status;
   } else {
-    num_data_bytes_sent_ += RowBatch::GetBatchSize(*batch);
+    num_data_bytes_sent_ += RowBatch::GetSerializedSize(*batch);
     VLOG_ROW << "incremented #data_bytes_sent="
              << num_data_bytes_sent_;
   }
@@ -507,17 +507,15 @@ void DataStreamSender::Close(RuntimeState* state) {
   closed_ = true;
 }
 
-Status DataStreamSender::SerializeBatch(RowBatch* src, TRowBatch* dest, int 
num_receivers) {
+Status DataStreamSender::SerializeBatch(
+    RowBatch* src, TRowBatch* dest, int num_receivers) {
   VLOG_ROW << "serializing " << src->num_rows() << " rows";
   {
     SCOPED_TIMER(profile_->total_time_counter());
     SCOPED_TIMER(serialize_batch_timer_);
     RETURN_IF_ERROR(src->Serialize(dest));
-    int bytes = RowBatch::GetBatchSize(*dest);
-    int uncompressed_bytes = bytes - dest->tuple_data.size() + 
dest->uncompressed_size;
-    // The size output_batch would be if we didn't compress tuple_data (will 
be equal to
-    // actual batch size if tuple_data isn't compressed)
-
+    int64_t bytes = RowBatch::GetSerializedSize(*dest);
+    int64_t uncompressed_bytes = RowBatch::GetDeserializedSize(*dest);
     COUNTER_ADD(bytes_sent_counter_, bytes * num_receivers);
     COUNTER_ADD(uncompressed_bytes_counter_, uncompressed_bytes * 
num_receivers);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index 0515897..a3ed417 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -36,13 +36,12 @@ namespace impala {
   AbortUnsupportedFeature();
 }
 
-KrpcDataStreamMgr::~KrpcDataStreamMgr() {
-}
+KrpcDataStreamMgr::~KrpcDataStreamMgr(){}
 
-[[noreturn]] std::shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* 
profile,
-    bool is_merging) {
+    [[noreturn]] std::shared_ptr<DataStreamRecvrBase> 
KrpcDataStreamMgr::CreateRecvr(
+        RuntimeState* state, const RowDescriptor* row_desc,
+        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+        int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
   AbortUnsupportedFeature();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index adaebda..ef9bb45 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -38,8 +38,8 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
 
   [[noreturn]] std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* 
state,
       const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
-      bool is_merging) override;
+      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
+      RuntimeProfile* profile, bool is_merging) override;
 
   [[noreturn]] Status CloseSender(const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int sender_id) override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 942ac05..beed671 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -340,8 +340,15 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) {
   Reset();
 }
 
-int RowBatch::GetBatchSize(const TRowBatch& batch) {
-  int result = batch.tuple_data.size();
+int64_t RowBatch::GetDeserializedSize(const TRowBatch& batch) {
+  int64_t result = batch.uncompressed_size;
+  result += batch.row_tuples.size() * sizeof(TTupleId);
+  result += batch.tuple_offsets.size() * sizeof(int32_t);
+  return result;
+}
+
+int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) {
+  int64_t result = batch.tuple_data.size();
   result += batch.row_tuples.size() * sizeof(TTupleId);
   result += batch.tuple_offsets.size() * sizeof(int32_t);
   return result;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f2f52a8e/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 35a8f14..49dd066 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -300,8 +300,11 @@ class RowBatch {
   /// it is ignored. This function does not Reset().
   Status Serialize(TRowBatch* output_batch);
 
-  /// Utility function: returns total size of batch.
-  static int GetBatchSize(const TRowBatch& batch);
+  /// Utility function: returns total byte size of a batch in either 
serialized or
+  /// deserialized form. If a row batch is compressed, its serialized size can 
be much
+  /// less than the deserialized size.
+  static int64_t GetSerializedSize(const TRowBatch& batch);
+  static int64_t GetDeserializedSize(const TRowBatch& batch);
 
   int ALWAYS_INLINE num_rows() const { return num_rows_; }
   int ALWAYS_INLINE capacity() const { return capacity_; }

Reply via email to