IMPALA-4392: restore PeakMemoryUsage to DataSink profiles

The join build sink patches refactored the DataSink interface and
inadvertently removed this counter from the profile.
The problem was that the sink MemTracker was not initialized with the
sink's profile.

The fix is for the sink to create the MemTracker itself.

Testing:
Ran core tests. Manually checked profile to make sure the counter
appeared in HdfsTableSink, DataStreamSender, etc.

Change-Id: Iaa5db623a84c47d5904033ec26aece74f500a2c9
Reviewed-on: http://gerrit.cloudera.org:8080/4969
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/0ab3d769
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/0ab3d769
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/0ab3d769

Branch: refs/heads/master
Commit: 0ab3d7691e95bc16369779e334b75c328dd40602
Parents: 77a2941
Author: Tim Armstrong <[email protected]>
Authored: Tue Nov 8 11:16:07 2016 -0800
Committer: Internal Jenkins <[email protected]>
Committed: Thu Nov 17 04:37:56 2016 +0000

----------------------------------------------------------------------
 be/src/exec/data-sink.cc                     | 13 +++++++++----
 be/src/exec/data-sink.h                      | 14 ++++++--------
 be/src/exec/hbase-table-sink.cc              |  4 ++--
 be/src/exec/hbase-table-sink.h               |  5 ++---
 be/src/exec/hdfs-table-sink.cc               |  4 ++--
 be/src/exec/hdfs-table-sink.h                |  2 +-
 be/src/exec/kudu-table-sink.cc               |  4 ++--
 be/src/exec/kudu-table-sink.h                |  2 +-
 be/src/exec/nested-loop-join-builder.cc      | 12 +++++-------
 be/src/exec/nested-loop-join-builder.h       |  8 +++++---
 be/src/exec/nested-loop-join-node.cc         |  6 +++---
 be/src/exec/partitioned-hash-join-builder.cc |  9 +++++----
 be/src/exec/partitioned-hash-join-builder.h  |  2 +-
 be/src/exec/plan-root-sink.cc                |  4 ++--
 be/src/exec/row-batch-cache.h                | 18 ++++++------------
 be/src/runtime/data-stream-sender.cc         | 19 +++++++++----------
 be/src/runtime/data-stream-sender.h          |  2 +-
 be/src/runtime/plan-fragment-executor.cc     |  9 ++-------
 be/src/runtime/plan-fragment-executor.h      |  1 -
 19 files changed, 64 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 17f1f8c..967eda3 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -176,12 +176,13 @@ string DataSink::OutputDmlStats(const PartitionStatusMap& 
stats,
   return ss.str();
 }
 
-Status DataSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  DCHECK(mem_tracker != NULL);
+Status DataSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) {
+  DCHECK(parent_mem_tracker != NULL);
   profile_ = state->obj_pool()->Add(new RuntimeProfile(state->obj_pool(), 
GetName()));
-  mem_tracker_ = mem_tracker;
+  const string& name = GetName();
+  mem_tracker_.reset(new MemTracker(profile_, -1, name, parent_mem_tracker));
   expr_mem_tracker_.reset(
-      new MemTracker(-1, Substitute("$0 Exprs", GetName()), mem_tracker, 
false));
+      new MemTracker(-1, Substitute("$0 Exprs", name), mem_tracker_.get(), 
false));
   return Status::OK();
 }
 
@@ -191,6 +192,10 @@ void DataSink::Close(RuntimeState* state) {
     expr_mem_tracker_->UnregisterFromParent();
     expr_mem_tracker_.reset();
   }
+  if (mem_tracker_ != NULL) {
+    mem_tracker_->UnregisterFromParent();
+    mem_tracker_.reset();
+  }
   closed_ = true;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index 1970cf2..0fc8eca 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -58,9 +58,9 @@ class DataSink {
   virtual std::string GetName() = 0;
 
   /// Setup. Call before Send(), Open(), or Close() during the prepare phase 
of the query
-  /// fragment. Any memory allocated will be tracked against the 
caller-provided
-  /// 'mem_tracker'. Subclasses must call DataSink::Prepare().
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
+  /// fragment. Creates a MemTracker for the sink that is a child of 
'parent_mem_tracker'.
+  /// Subclasses must call DataSink::Prepare().
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
   /// Call before Send() to open the sink.
   virtual Status Open(RuntimeState* state) = 0;
@@ -94,7 +94,7 @@ class DataSink {
   static std::string OutputDmlStats(const PartitionStatusMap& stats,
       const std::string& prefix = "");
 
-  MemTracker* mem_tracker() const { return mem_tracker_; }
+  MemTracker* mem_tracker() const { return mem_tracker_.get(); }
   RuntimeProfile* profile() const { return profile_; }
 
  protected:
@@ -109,13 +109,11 @@ class DataSink {
   RuntimeProfile* profile_;
 
   /// The MemTracker for all allocations made by the DataSink. Initialized in 
Prepare().
-  /// Not owned.
-  MemTracker* mem_tracker_;
+  boost::scoped_ptr<MemTracker> mem_tracker_;
 
   /// A child of 'mem_tracker_' that tracks expr allocations. Initialized in 
Prepare().
   boost::scoped_ptr<MemTracker> expr_mem_tracker_;
-
 };
 
-}  // namespace impala
+} // namespace impala
 #endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 3d84fed..a89384e 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -53,8 +53,8 @@ Status HBaseTableSink::PrepareExprs(RuntimeState* state) {
   return Status::OK();
 }
 
-Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   SCOPED_TIMER(profile()->total_time_counter());
 
   // Get the hbase table descriptor.  The table name will be used.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/hbase-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index 7e40033..3326ddb 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -36,10 +36,9 @@ namespace impala {
 class HBaseTableSink : public DataSink {
  public:
   HBaseTableSink(const RowDescriptor& row_desc,
-                 const std::vector<TExpr>& select_list_texprs,
-                 const TDataSink& tsink);
+      const std::vector<TExpr>& select_list_texprs, const TDataSink& tsink);
   virtual std::string GetName() { return "HBaseTableSink"; }
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
   virtual Status Open(RuntimeState* state);
   virtual Status Send(RuntimeState* state, RowBatch* batch);
   virtual Status FlushFinal(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index 07c1167..8c74797 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -104,8 +104,8 @@ Status HdfsTableSink::PrepareExprs(RuntimeState* state) {
   return Status::OK();
 }
 
-Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   unique_id_str_ = PrintId(state->fragment_instance_id(), "-");
   SCOPED_TIMER(profile()->total_time_counter());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index bb2f9d7..ae32270 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -133,7 +133,7 @@ class HdfsTableSink : public DataSink {
   virtual std::string GetName() { return "HdfsTableSink"; }
 
   /// Prepares output_exprs and partition_key_exprs, and connects to HDFS.
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
   /// Opens output_exprs and partition_key_exprs, prepares the single output 
partition for
   /// static inserts, and populates partition_descriptor_map_.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 11613aa..5dd6336 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -77,8 +77,8 @@ Status KuduTableSink::PrepareExprs(RuntimeState* state) {
   return Status::OK();
 }
 
-Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   SCOPED_TIMER(profile()->total_time_counter());
   RETURN_IF_ERROR(PrepareExprs(state));
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index 3ff667c..2f539bc 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -60,7 +60,7 @@ class KuduTableSink : public DataSink {
 
   /// Prepares the expressions to be applied and creates a KuduSchema based on 
the
   /// expressions and KuduTableDescriptor.
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
   /// Connects to Kudu and creates the KuduSession to be used for the writes.
   virtual Status Open(RuntimeState* state);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc 
b/be/src/exec/nested-loop-join-builder.cc
index 0fca3b5..5631df4 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -27,13 +27,11 @@
 
 using namespace impala;
 
-NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state,
-    MemTracker* mem_tracker)
-    : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size(),
-      mem_tracker) {}
+NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state)
+  : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size()) {}
 
-Status NljBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status NljBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   return Status::OK();
 }
 
@@ -89,7 +87,7 @@ Status NljBuilder::DeepCopyBuildBatches(RuntimeState* state) {
     RowBatch* input_batch = *it;
     // TODO: it would be more efficient to do the deep copy within the same 
batch, rather
     // than to a new batch.
-    RowBatch* copied_batch = build_batch_cache_.GetNextBatch();
+    RowBatch* copied_batch = build_batch_cache_.GetNextBatch(mem_tracker());
     input_batch->DeepCopyTo(copied_batch);
     copied_build_batches_.AddRowBatch(copied_batch);
     // Reset input batches as we go to free up memory if possible.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/nested-loop-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.h 
b/be/src/exec/nested-loop-join-builder.h
index 3fb5348..86f497b 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -37,11 +37,11 @@ namespace impala {
 /// is used and all data is deep copied into memory owned by the builder.
 class NljBuilder : public DataSink {
  public:
-  NljBuilder(const RowDescriptor& row_desc, RuntimeState* state, MemTracker* 
mem_tracker);
+  NljBuilder(const RowDescriptor& row_desc, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual std::string GetName() override { return "Nested Loop Join Builder"; }
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) 
override;
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) 
override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status Send(RuntimeState* state, RowBatch* batch) override;
   virtual Status FlushFinal(RuntimeState* state) override;
@@ -52,7 +52,9 @@ class NljBuilder : public DataSink {
 
   /// Returns the next build batch that should be filled and passed to 
AddBuildBatch().
   /// Exposed so that NestedLoopJoinNode can bypass the DataSink interface for 
efficiency.
-  inline RowBatch* GetNextEmptyBatch() { return 
build_batch_cache_.GetNextBatch(); }
+  inline RowBatch* GetNextEmptyBatch() {
+    return build_batch_cache_.GetNextBatch(mem_tracker_.get());
+  }
 
   /// Add a batch to the build side. Does not copy the data, so either 
resources must
   /// be owned by the batch (or a later batch), or DeepCopyBuildBatches() must 
be called

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc 
b/be/src/exec/nested-loop-join-node.cc
index b640e58..caa57b4 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -99,10 +99,10 @@ Status NestedLoopJoinNode::Prepare(RuntimeState* state) {
   // join_conjunct_ctxs_ are evaluated in the context of rows assembled from
   // all inner and outer tuples.
   RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
-  RETURN_IF_ERROR(Expr::Prepare(
-      join_conjunct_ctxs_, state, full_row_desc, expr_mem_tracker()));
+  RETURN_IF_ERROR(
+      Expr::Prepare(join_conjunct_ctxs_, state, full_row_desc, 
expr_mem_tracker()));
 
-  builder_.reset(new NljBuilder(child(1)->row_desc(), state, mem_tracker()));
+  builder_.reset(new NljBuilder(child(1)->row_desc(), state));
   RETURN_IF_ERROR(builder_->Prepare(state, mem_tracker()));
   runtime_profile()->PrependChild(builder_->profile());
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index 8ee8394..e577b22 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -108,8 +108,8 @@ string PhjBuilder::GetName() {
   return Substitute("Hash Join Builder (join_node_id=$0)", join_node_id_);
 }
 
-Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   RETURN_IF_ERROR(
       Expr::Prepare(build_expr_ctxs_, state, row_desc_, 
expr_mem_tracker_.get()));
   expr_ctxs_to_free_.insert(
@@ -121,10 +121,11 @@ Status PhjBuilder::Prepare(RuntimeState* state, 
MemTracker* mem_tracker) {
   }
   RETURN_IF_ERROR(HashTableCtx::Create(state, build_expr_ctxs_, 
build_expr_ctxs_,
       HashTableStoresNulls(), is_not_distinct_from_, 
state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), mem_tracker_, 
&ht_ctx_));
+      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), 
mem_tracker_.get(),
+      &ht_ctx_));
   RETURN_IF_ERROR(state->block_mgr()->RegisterClient(
       Substitute("PartitionedHashJoin id=$0 builder=$1", join_node_id_, this),
-      MinRequiredBuffers(), true, mem_tracker, state, &block_mgr_client_));
+      MinRequiredBuffers(), true, mem_tracker_.get(), state, 
&block_mgr_client_));
 
   partitions_created_ = ADD_COUNTER(profile(), "PartitionsCreated", 
TUnit::UNIT);
   largest_partition_percent_ =

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index 650452c..48f4c88 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -78,7 +78,7 @@ class PhjBuilder : public DataSink {
 
   /// Implementations of DataSink interface methods.
   virtual std::string GetName() override;
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker) 
override;
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) 
override;
   virtual Status Open(RuntimeState* state) override;
   virtual Status Send(RuntimeState* state, RowBatch* batch) override;
   virtual Status FlushFinal(RuntimeState* state) override;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 5f6cd75..f513800 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -38,8 +38,8 @@ PlanRootSink::PlanRootSink(const RowDescriptor& row_desc,
     const std::vector<TExpr>& output_exprs, const TDataSink& thrift_sink)
   : DataSink(row_desc), thrift_output_exprs_(output_exprs) {}
 
-Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) {
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status PlanRootSink::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   RETURN_IF_ERROR(
       Expr::CreateExprTrees(state->obj_pool(), thrift_output_exprs_, 
&output_expr_ctxs_));
   RETURN_IF_ERROR(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/exec/row-batch-cache.h
----------------------------------------------------------------------
diff --git a/be/src/exec/row-batch-cache.h b/be/src/exec/row-batch-cache.h
index 81ae25c..83d0ae9 100644
--- a/be/src/exec/row-batch-cache.h
+++ b/be/src/exec/row-batch-cache.h
@@ -34,23 +34,18 @@ class MemTracker;
 /// Simple cache of row batches.
 class RowBatchCache {
  public:
-  RowBatchCache(const RowDescriptor& row_desc, int batch_size, MemTracker* 
mem_tracker)
-    : row_desc_(row_desc),
-      batch_size_(batch_size),
-      mem_tracker_(mem_tracker),
-      next_row_batch_idx_(0) {
-  }
+  RowBatchCache(const RowDescriptor& row_desc, int batch_size)
+    : row_desc_(row_desc), batch_size_(batch_size), next_row_batch_idx_(0) {}
 
-  ~RowBatchCache() {
-    DCHECK_EQ(0, row_batches_.size());
-  }
+  ~RowBatchCache() { DCHECK_EQ(0, row_batches_.size()); }
 
   /// Returns the next batch from the cache. Expands the cache if necessary.
-  RowBatch* GetNextBatch() {
+  /// If a new batch is created, its memory is tracked against 'mem_tracker'.
+  RowBatch* GetNextBatch(MemTracker* mem_tracker) {
     if (next_row_batch_idx_ >= row_batches_.size()) {
       // Expand the cache with a new row batch.
       row_batches_.push_back(
-          std::make_unique<RowBatch>(row_desc_, batch_size_, mem_tracker_));
+          std::make_unique<RowBatch>(row_desc_, batch_size_, mem_tracker));
     } else {
       // Reset batch from the cache before returning it.
       row_batches_[next_row_batch_idx_]->Reset();
@@ -79,7 +74,6 @@ class RowBatchCache {
   /// Parameters needed for creating row batches.
   const RowDescriptor& row_desc_;
   int batch_size_;
-  MemTracker* mem_tracker_; // not owned
 
   /// List of cached row-batch objects. The row-batch objects are owned by 
this cache.
   std::vector<std::unique_ptr<RowBatch>> row_batches_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index f80aee5..9e0ddc7 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -381,25 +381,24 @@ DataStreamSender::~DataStreamSender() {
   }
 }
 
-Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* mem_tracker) 
{
-  RETURN_IF_ERROR(DataSink::Prepare(state, mem_tracker));
+Status DataStreamSender::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   state_ = state;
   SCOPED_TIMER(profile_->total_time_counter());
 
-  RETURN_IF_ERROR(Expr::Prepare(partition_expr_ctxs_, state, row_desc_, 
mem_tracker));
+  RETURN_IF_ERROR(
+      Expr::Prepare(partition_expr_ctxs_, state, row_desc_, 
mem_tracker_.get()));
 
-  bytes_sent_counter_ =
-      ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+  bytes_sent_counter_ = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
   uncompressed_bytes_counter_ =
       ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
-  serialize_batch_timer_ =
-      ADD_TIMER(profile(), "SerializeBatchTime");
-  thrift_transmit_timer_ = 
profile()->AddConcurrentTimerCounter("TransmitDataRPCTime",
-      TUnit::TIME_NS);
+  serialize_batch_timer_ = ADD_TIMER(profile(), "SerializeBatchTime");
+  thrift_transmit_timer_ =
+      profile()->AddConcurrentTimerCounter("TransmitDataRPCTime", 
TUnit::TIME_NS);
   network_throughput_ =
       profile()->AddDerivedCounter("NetworkThroughput(*)", 
TUnit::BYTES_PER_SECOND,
           bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,
-                        thrift_transmit_timer_));
+                                       thrift_transmit_timer_));
   overall_throughput_ =
       profile()->AddDerivedCounter("OverallThroughput", 
TUnit::BYTES_PER_SECOND,
            bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_sent_counter_,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index a8472ad..60b9025 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -67,7 +67,7 @@ class DataStreamSender : public DataSink {
 
   /// Must be called before other API calls, and before the codegen'd IR 
module is
   /// compiled (i.e. in an ExecNode's Prepare() function).
-  virtual Status Prepare(RuntimeState* state, MemTracker* mem_tracker);
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
 
   /// Must be called before Send() or Close(), and after the codegen'd IR 
module is
   /// compiled (i.e. in an ExecNode's Open() function).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/runtime/plan-fragment-executor.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.cc 
b/be/src/runtime/plan-fragment-executor.cc
index c7b580e..12eb03d 100644
--- a/be/src/runtime/plan-fragment-executor.cc
+++ b/be/src/runtime/plan-fragment-executor.cc
@@ -225,9 +225,8 @@ Status PlanFragmentExecutor::PrepareInternal(const 
TExecPlanFragmentParams& requ
       DataSink::CreateDataSink(obj_pool(), 
request.fragment_ctx.fragment.output_sink,
           request.fragment_ctx.fragment.output_exprs, fragment_instance_ctx,
           exec_tree_->row_desc(), &sink_));
-  sink_mem_tracker_.reset(
-      new MemTracker(-1, sink_->GetName(), 
runtime_state_->instance_mem_tracker(), true));
-  RETURN_IF_ERROR(sink_->Prepare(runtime_state(), sink_mem_tracker_.get()));
+  RETURN_IF_ERROR(
+      sink_->Prepare(runtime_state(), runtime_state_->instance_mem_tracker()));
 
   RuntimeProfile* sink_profile = sink_->profile();
   if (sink_profile != NULL) {
@@ -536,10 +535,6 @@ void PlanFragmentExecutor::Close() {
   if (sink_.get() != nullptr) sink_->Close(runtime_state());
 
   row_batch_.reset();
-  if (sink_mem_tracker_ != NULL) {
-    sink_mem_tracker_->UnregisterFromParent();
-    sink_mem_tracker_.reset();
-  }
 
   // Prepare should always have been called, and so runtime_state_ should be 
set
   DCHECK(prepared_promise_.IsSet());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0ab3d769/be/src/runtime/plan-fragment-executor.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/plan-fragment-executor.h 
b/be/src/runtime/plan-fragment-executor.h
index 6385213..7149272 100644
--- a/be/src/runtime/plan-fragment-executor.h
+++ b/be/src/runtime/plan-fragment-executor.h
@@ -200,7 +200,6 @@ class PlanFragmentExecutor {
   /// Output sink for rows sent to this fragment. Created in Prepare(), owned 
by this
   /// object.
   boost::scoped_ptr<DataSink> sink_;
-  boost::scoped_ptr<MemTracker> sink_mem_tracker_;
 
   /// Set if this fragment instance is the root of the entire plan, so that a 
consumer can
   /// pull results by calling root_sink_->GetNext(). Same object as sink_.

Reply via email to