IMPALA-6396: Exchange node's memory usage should include its receiver's

A DataStreamRecvr is co-owned by the DataStreamMgr and
an Exchange node. However, the life time of the memory
allocations (e.g. row batches) of a DataStreamRecvr never
exceeds that of its owning Exchange node. Previously, we
used the fragment instance's MemTracker as the parent of
the DataStreamRecvr's MemTracker. This change switches to
using the MemTracker of the owning Exchange node as the
parent tracker of the DataStreamRecvr. This makes it
easier to identify the peak memory usage of the receivers
of different exchange nodes in the runtime profile and
query summary. Most of the exchange node's memory usage
is from its receiver so we don't track the peak memory
usage of the receiver separately.

Sample output from TPCH-Q21:

EXCHANGE_NODE (id=18):(Total: 1s448ms, non-child: 265.818ms, % non-child: 
18.35%)
   - ConvertRowBatchTime: 223.895ms
   - PeakMemoryUsage: 10.04 MB (10524943)
   - RowsReturned: 1.27M (1267464)
   - RowsReturnedRate: 875.19 K/sec
  RecvrSide:
    BytesReceived(500.000ms): 0, 1.64 MB, 9.98 MB, 9.98 MB, 10.01 MB, 10.01 MB, 
10.01 MB, 31.79 MB, 60.19 MB, 87.84 MB
     - FirstBatchArrivalWaitTime: 0.000ns
     - TotalBytesReceived: 93.07 MB (97594728)
     - TotalGetBatchTime: 1s194ms
       - DataArrivalTimer: 1s183ms
   SenderSide:
      - DeserializeRowBatchTime: 344.343ms
      - NumBatchesAccepted: 3.80K (3796)
      - NumBatchesDeferred: 5 (5)
      - NumEarlySenders: 0 (0)

Testing done: Updated test_observability.py to verify the
peak memory usage of exchange node is not 0.

Change-Id: I8ca3c47d87bfcd221d34565eda1878f3c15d5c45
Reviewed-on: http://gerrit.cloudera.org:8080/9202
Reviewed-by: Michael Ho <k...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/c3e99332
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/c3e99332
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/c3e99332

Branch: refs/heads/2.x
Commit: c3e993327365ca90d186fe534294c5c5f07de156
Parents: 1e17aba
Author: Michael Ho <k...@cloudera.com>
Authored: Thu Feb 1 13:56:31 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Sat Feb 10 04:10:16 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc           |  6 +++---
 be/src/runtime/data-stream-mgr-base.h  |  9 +++++----
 be/src/runtime/data-stream-mgr.cc      | 17 ++++++++---------
 be/src/runtime/data-stream-mgr.h       | 11 ++++++-----
 be/src/runtime/data-stream-test.cc     | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.cc | 13 ++++++-------
 be/src/runtime/krpc-data-stream-mgr.h  | 11 ++++++-----
 tests/query_test/test_observability.py |  2 ++
 8 files changed, 42 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 353a59b..cc39382 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -81,9 +81,9 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
-  stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state,
-      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
-      FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
+  stream_recvr_ = 
ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(&input_row_desc_,
+      state->fragment_instance_id(), id_, num_senders_,
+      FLAGS_exchg_node_buffer_size_bytes, is_merging_, runtime_profile(), 
mem_tracker());
   if (is_merging_) {
     less_than_.reset(
         new TupleRowComparator(ordering_exprs_, is_asc_order_, nulls_first_));

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr-base.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr-base.h 
b/be/src/runtime/data-stream-mgr-base.h
index 0e392e3..f9761cb 100644
--- a/be/src/runtime/data-stream-mgr-base.h
+++ b/be/src/runtime/data-stream-mgr-base.h
@@ -26,6 +26,7 @@
 namespace impala {
 
 class DataStreamRecvrBase;
+class MemTracker;
 class RuntimeProfile;
 class RuntimeState;
 class TRowBatch;
@@ -43,10 +44,10 @@ class DataStreamMgrBase : public CacheLineAligned {
   virtual ~DataStreamMgrBase() { }
 
   /// Create a receiver for a specific fragment_instance_id/node_id 
destination;
-  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) = 0;
+  virtual std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const 
RowDescriptor* row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) = 0;
 
   /// Closes all receivers registered for fragment_instance_id immediately.
   virtual void Cancel(const TUniqueId& fragment_instance_id) = 0;

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc 
b/be/src/runtime/data-stream-mgr.cc
index 45eee7f..48a819c 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -75,17 +75,16 @@ inline uint32_t DataStreamMgr::GetHashValue(
   return value;
 }
 
-shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-    PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-    RuntimeProfile* profile, bool is_merging) {
-  DCHECK(profile != NULL);
+shared_ptr<DataStreamRecvrBase> DataStreamMgr::CreateRecvr(const 
RowDescriptor* row_desc,
+    const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+    int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+    MemTracker* parent_tracker) {
+  DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="
             << fragment_instance_id << ", node=" << dest_node_id;
-  shared_ptr<DataStreamRecvr> recvr(
-      new DataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          fragment_instance_id, dest_node_id, num_senders, is_merging, 
buffer_size,
-          profile));
+  shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(this, parent_tracker, 
row_desc,
+      fragment_instance_id, dest_node_id, num_senders, is_merging, 
buffer_size, profile));
   size_t hash_value = GetHashValue(fragment_instance_id, dest_node_id);
   lock_guard<mutex> l(lock_);
   fragment_recvr_set_.insert(make_pair(fragment_instance_id, dest_node_id));

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 07f7c56..2be6478 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -71,13 +71,14 @@ class DataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/node_id 
destination;
   /// If is_merging is true, the receiver maintains a separate queue of 
incoming row
   /// batches for each sender and merges the sorted streams from each sender 
into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node 
which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created 
receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance 
and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* 
row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Adds a row batch to the recvr identified by 
fragment_instance_id/dest_node_id
   /// if the recvr has not been cancelled. sender_id identifies the sender 
instance

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 07eefd4..75d5ac9 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -415,8 +415,8 @@ class DataStreamTest : public 
DataStreamTestBase<testing::TestWithParam<KrpcSwit
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, 
receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), 
row_desc_,
-        instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, 
is_merging);
+    info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, 
DEST_NODE_ID,
+        num_senders, buffer_size, is_merging, profile, &tracker_);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, 
&info);
     } else {
@@ -767,9 +767,8 @@ TEST_P(DataStreamTestThriftOnly, 
CloseRecvrWhileReferencesRemain) {
   // Start just one receiver.
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
-  shared_ptr<DataStreamRecvrBase> stream_recvr = stream_mgr_->CreateRecvr(
-      runtime_state.get(), row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile,
-      false);
+  shared_ptr<DataStreamRecvrBase> stream_recvr = 
stream_mgr_->CreateRecvr(row_desc_,
+      instance_id, DEST_NODE_ID, 1, 1, false, profile, &tracker_);
 
   // Perform tear down, but keep a reference to the receiver so that it is 
deleted last
   // (to confirm that the destructor does not access invalid state after 
tear-down).
@@ -832,8 +831,8 @@ TEST_P(DataStreamTestForImpala6346, TestNoDeadlock) {
   RuntimeProfile* profile = RuntimeProfile::Create(&obj_pool_, "TestReceiver");
   receiver_info_.push_back(ReceiverInfo(TPartitionType::UNPARTITIONED, 4, 1));
   ReceiverInfo& info = receiver_info_.back();
-  info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), row_desc_,
-      instance_id, DEST_NODE_ID, 4, 1024 * 1024, profile, false);
+  info.stream_recvr = stream_mgr_->CreateRecvr(row_desc_, instance_id, 
DEST_NODE_ID,
+      4, 1024 * 1024, false, profile, &tracker_);
   info.thread_handle = new thread(
       &DataStreamTestForImpala6346_TestNoDeadlock_Test::ReadStream, this, 
&info);
 

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/krpc-data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.cc 
b/be/src/runtime/krpc-data-stream-mgr.cc
index fabea13..91111dc 100644
--- a/be/src/runtime/krpc-data-stream-mgr.cc
+++ b/be/src/runtime/krpc-data-stream-mgr.cc
@@ -94,16 +94,15 @@ inline uint32_t KrpcDataStreamMgr::GetHashValue(
 }
 
 shared_ptr<DataStreamRecvrBase> KrpcDataStreamMgr::CreateRecvr(
-    RuntimeState* state, const RowDescriptor* row_desc,
-    const TUniqueId& finst_id, PlanNodeId dest_node_id, int num_senders,
-    int64_t buffer_size, RuntimeProfile* profile, bool is_merging) {
-
+    const RowDescriptor* row_desc, const TUniqueId& finst_id, PlanNodeId 
dest_node_id,
+    int num_senders, int64_t buffer_size, bool is_merging, RuntimeProfile* 
profile,
+    MemTracker* parent_tracker) {
   DCHECK(profile != nullptr);
+  DCHECK(parent_tracker != nullptr);
   VLOG_FILE << "creating receiver for fragment="<< finst_id
             << ", node=" << dest_node_id;
-  shared_ptr<KrpcDataStreamRecvr> recvr(
-      new KrpcDataStreamRecvr(this, state->instance_mem_tracker(), row_desc,
-          finst_id, dest_node_id, num_senders, is_merging, buffer_size, 
profile));
+  shared_ptr<KrpcDataStreamRecvr> recvr(new KrpcDataStreamRecvr(this, 
parent_tracker,
+      row_desc, finst_id, dest_node_id, num_senders, is_merging, buffer_size, 
profile));
   uint32_t hash_value = GetHashValue(finst_id, dest_node_id);
   EarlySendersList early_senders_for_recvr;
   {

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/be/src/runtime/krpc-data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/krpc-data-stream-mgr.h 
b/be/src/runtime/krpc-data-stream-mgr.h
index 458ebe7..16c0b30 100644
--- a/be/src/runtime/krpc-data-stream-mgr.h
+++ b/be/src/runtime/krpc-data-stream-mgr.h
@@ -236,13 +236,14 @@ class KrpcDataStreamMgr : public DataStreamMgrBase {
   /// Create a receiver for a specific fragment_instance_id/dest_node_id.
   /// If is_merging is true, the receiver maintains a separate queue of 
incoming row
   /// batches for each sender and merges the sorted streams from each sender 
into a
-  /// single stream.
+  /// single stream. 'parent_tracker' is the MemTracker of the exchange node 
which owns
+  /// this receiver. It's the parent of the MemTracker of the newly created 
receiver.
   /// Ownership of the receiver is shared between this DataStream mgr instance 
and the
   /// caller.
-  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(RuntimeState* state,
-      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
-      PlanNodeId dest_node_id, int num_senders, int64_t buffer_size,
-      RuntimeProfile* profile, bool is_merging) override;
+  std::shared_ptr<DataStreamRecvrBase> CreateRecvr(const RowDescriptor* 
row_desc,
+      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int 
num_senders,
+      int64_t buffer_size, bool is_merging, RuntimeProfile* profile,
+      MemTracker* parent_tracker) override;
 
   /// Handler for TransmitData() RPC.
   ///

http://git-wip-us.apache.org/repos/asf/impala/blob/c3e99332/tests/query_test/test_observability.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_observability.py 
b/tests/query_test/test_observability.py
index 85fc4f1..9c0bb65 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -38,6 +38,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[0]['operator'] == '05:MERGING-EXCHANGE'
     assert result.exec_summary[0]['num_rows'] == 5
     assert result.exec_summary[0]['est_num_rows'] == 5
+    assert result.exec_summary[0]['peak_mem'] > 0
 
     for line in result.runtime_profile.split('\n'):
       # The first 'RowsProduced' we find is for the coordinator fragment.
@@ -55,6 +56,7 @@ class TestObservability(ImpalaTestSuite):
     assert result.exec_summary[5]['operator'] == '04:EXCHANGE'
     assert result.exec_summary[5]['num_rows'] == 25
     assert result.exec_summary[5]['est_num_rows'] == 25
+    assert result.exec_summary[5]['peak_mem'] > 0
 
   @SkipIfS3.hbase
   @SkipIfLocal.hbase

Reply via email to