IMPALA-5481: Clarify RowDescriptor ownership

RowDescriptors are originally allocated in-line with the exec node that
uses them. Their lifetime is therefore guaranteed to be as long as the
parent fragment instance. However, we copy the RowDescriptors into
RowBatches, which adds allocation pressure (RowDescriptors contain
vectors), and is unnecessary as a) the descriptor is constant and b)
RowBatches get destroyed before exec nodes.

This patch standardises ownership of RowDescriptor objects, by changing
members that were copies or const references to RowDescriptors to be
const RowDescriptor*. Method arguments are either const* to convey that
ownership is to be shared, or const& to convey that the descriptor is to
be used but not mutated by the callee.

The tradeoff of fewer allocations appears to outweigh any loss of cache
locality due to sharing the RowDescriptor. On a 16-node cluster that
previously spend ~20% of its tcmalloc time allocating RowDescriptors,
this patch reduced that time to 0%.

Change-Id: I2fc39170f775581d406b6a97445046f508d8d75f
Reviewed-on: http://gerrit.cloudera.org:8080/7206
Reviewed-by: Henry Robinson <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/317c413a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/317c413a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/317c413a

Branch: refs/heads/master
Commit: 317c413a00bd9b3b29eeaf2efe556c2e924e2d74
Parents: 352ad55
Author: Henry Robinson <[email protected]>
Authored: Wed Jun 14 11:49:05 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Wed Jun 21 02:07:25 2017 +0000

----------------------------------------------------------------------
 .../benchmarks/row-batch-serialize-benchmark.cc | 29 +++++-----
 be/src/exec/aggregation-node.cc                 |  6 +-
 be/src/exec/analytic-eval-node.cc               | 11 ++--
 be/src/exec/blocking-join-node.cc               | 24 ++++----
 be/src/exec/data-sink.cc                        |  8 +--
 be/src/exec/data-sink.h                         |  9 +--
 be/src/exec/exchange-node.cc                    |  4 +-
 be/src/exec/exec-node.h                         | 11 +++-
 be/src/exec/hash-join-node.cc                   | 26 ++++-----
 be/src/exec/hbase-table-sink.cc                 |  2 +-
 be/src/exec/hbase-table-sink.h                  |  2 +-
 be/src/exec/hdfs-parquet-scanner.cc             | 38 ++++++-------
 be/src/exec/hdfs-scan-node-base.cc              |  6 +-
 be/src/exec/hdfs-table-sink.cc                  |  4 +-
 be/src/exec/hdfs-table-sink.h                   |  2 +-
 be/src/exec/kudu-scanner.cc                     |  6 +-
 be/src/exec/kudu-table-sink.cc                  | 10 ++--
 be/src/exec/kudu-table-sink.h                   |  2 +-
 be/src/exec/nested-loop-join-builder.cc         |  3 +-
 be/src/exec/nested-loop-join-builder.h          |  2 +-
 be/src/exec/nested-loop-join-node.cc            | 12 ++--
 be/src/exec/partitioned-aggregation-node.cc     | 27 ++++-----
 be/src/exec/partitioned-aggregation-node.h      |  2 +-
 be/src/exec/partitioned-hash-join-builder.cc    | 14 ++---
 be/src/exec/partitioned-hash-join-builder.h     |  6 +-
 be/src/exec/partitioned-hash-join-node.cc       | 18 +++---
 be/src/exec/plan-root-sink.cc                   |  5 +-
 be/src/exec/plan-root-sink.h                    |  2 +-
 be/src/exec/row-batch-cache.h                   |  4 +-
 be/src/exec/row-batch-list-test.cc              |  6 +-
 be/src/exec/sort-node.cc                        |  2 +-
 be/src/exec/topn-node.cc                        |  2 +-
 be/src/exec/union-node.cc                       |  6 +-
 be/src/exec/unnest-node.cc                      | 10 ++--
 be/src/runtime/buffered-tuple-stream-test.cc    | 60 ++++++++++----------
 be/src/runtime/buffered-tuple-stream-v2-test.cc | 58 +++++++++----------
 be/src/runtime/buffered-tuple-stream-v2.cc      | 16 +++---
 be/src/runtime/buffered-tuple-stream-v2.h       |  4 +-
 be/src/runtime/buffered-tuple-stream.cc         | 27 ++++-----
 be/src/runtime/buffered-tuple-stream.h          |  4 +-
 be/src/runtime/data-stream-mgr.cc               |  2 +-
 be/src/runtime/data-stream-mgr.h                |  7 +--
 be/src/runtime/data-stream-recvr.cc             |  6 +-
 be/src/runtime/data-stream-recvr.h              |  8 +--
 be/src/runtime/data-stream-sender.cc            | 22 ++++---
 be/src/runtime/data-stream-sender.h             |  2 +-
 be/src/runtime/data-stream-test.cc              | 10 ++--
 be/src/runtime/descriptors.cc                   |  2 +-
 be/src/runtime/descriptors.h                    |  6 +-
 be/src/runtime/row-batch-serialize-test.cc      | 16 +++---
 be/src/runtime/row-batch-test.cc                |  8 +--
 be/src/runtime/row-batch.cc                     | 45 ++++++++-------
 be/src/runtime/row-batch.h                      | 13 +++--
 be/src/runtime/sorted-run-merger.cc             |  2 +-
 be/src/runtime/sorted-run-merger.h              |  4 +-
 be/src/runtime/sorter.cc                        | 11 ++--
 be/src/util/debug-util.cc                       |  2 +-
 57 files changed, 330 insertions(+), 326 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/benchmarks/row-batch-serialize-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/row-batch-serialize-benchmark.cc 
b/be/src/benchmarks/row-batch-serialize-benchmark.cc
index 5a8a104..0099260 100644
--- a/be/src/benchmarks/row-batch-serialize-benchmark.cc
+++ b/be/src/benchmarks/row-batch-serialize-benchmark.cc
@@ -106,7 +106,7 @@ class RowBatchSerializeBaseline {
     output_batch->compression_type = THdfsCompression::NONE;
 
     output_batch->num_rows = batch->num_rows_;
-    batch->row_desc_.ToThrift(&output_batch->row_tuples);
+    batch->row_desc_->ToThrift(&output_batch->row_tuples);
     output_batch->tuple_offsets.reserve(batch->num_rows_ * 
batch->num_tuples_per_row_);
 
     int64_t size = TotalByteSize(batch);
@@ -152,9 +152,9 @@ class RowBatchSerializeBaseline {
     char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
 
     for (int i = 0; i < batch->num_rows_; ++i) {
-       vector<TupleDescriptor*>::const_iterator desc =
-         batch->row_desc_.tuple_descriptors().begin();
-      for (int j = 0; desc != batch->row_desc_.tuple_descriptors().end(); 
++desc, ++j) {
+      vector<TupleDescriptor*>::const_iterator desc =
+          batch->row_desc_->tuple_descriptors().begin();
+      for (int j = 0; desc != batch->row_desc_->tuple_descriptors().end(); 
++desc, ++j) {
         Tuple* tuple = batch->GetRow(i)->GetTuple(j);
         if (tuple == NULL) {
           // NULLs are encoded as -1
@@ -177,7 +177,7 @@ class RowBatchSerializeBaseline {
       for (int j = 0; j < batch->num_tuples_per_row_; ++j) {
         Tuple* tuple = batch->GetRow(i)->GetTuple(j);
         if (tuple == NULL) continue;
-        result += 
tuple->TotalByteSize(*batch->row_desc_.tuple_descriptors()[j]);
+        result += 
tuple->TotalByteSize(*batch->row_desc_->tuple_descriptors()[j]);
       }
     }
     return result;
@@ -223,11 +223,11 @@ class RowBatchSerializeBaseline {
     }
 
     // Check whether we have slots that require offset-to-pointer conversion.
-    if (!batch->row_desc_.HasVarlenSlots()) return;
+    if (!batch->row_desc_->HasVarlenSlots()) return;
 
     for (int i = 0; i < batch->num_rows_; ++i) {
       for (int j = 0; j < batch->num_tuples_per_row_; ++j) {
-        const TupleDescriptor* desc = batch->row_desc_.tuple_descriptors()[j];
+        const TupleDescriptor* desc = batch->row_desc_->tuple_descriptors()[j];
         if (!desc->HasVarlenSlots()) continue;
         Tuple* tuple = batch->GetRow(i)->GetTuple(j);
         if (tuple == NULL) continue;
@@ -244,7 +244,7 @@ class RowBatchSerializeBenchmark {
     srand(rand_seed);
     if (cycle <= 0) cycle = NUM_ROWS; // Negative means no repeats in cycle.
     MemPool* mem_pool = batch->tuple_data_pool();
-    const TupleDescriptor* tuple_desc = 
batch->row_desc().tuple_descriptors()[0];
+    const TupleDescriptor* tuple_desc = 
batch->row_desc()->tuple_descriptors()[0];
     int unique_tuples = (NUM_ROWS - 1) / repeats + 1;
     uint8_t* tuple_mem = mem_pool->Allocate(tuple_desc->byte_size() * 
unique_tuples);
     for (int i = 0; i < NUM_ROWS; ++i) {
@@ -307,15 +307,15 @@ class RowBatchSerializeBenchmark {
   static void TestDeserialize(int batch_size, void* data) {
     struct DeserializeArgs* args = reinterpret_cast<struct 
DeserializeArgs*>(data);
     for (int iter = 0; iter < batch_size; ++iter) {
-      RowBatch deserialized_batch(*args->row_desc, *args->trow_batch, 
args->tracker);
+      RowBatch deserialized_batch(args->row_desc, *args->trow_batch, 
args->tracker);
     }
   }
 
   static void TestDeserializeBaseline(int batch_size, void* data) {
     struct DeserializeArgs* args = reinterpret_cast<struct 
DeserializeArgs*>(data);
     for (int iter = 0; iter < batch_size; ++iter) {
-      RowBatch deserialized_batch(*args->row_desc, args->trow_batch->num_rows,
-          args->tracker);
+      RowBatch deserialized_batch(
+          args->row_desc, args->trow_batch->num_rows, args->tracker);
       RowBatchSerializeBaseline::Deserialize(&deserialized_batch, 
*args->trow_batch);
     }
   }
@@ -334,19 +334,18 @@ class RowBatchSerializeBenchmark {
     vector<TTupleId> tuple_id(1, (TTupleId) 0);
     RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
 
-    RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(row_desc, NUM_ROWS, 
&tracker));
+    RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, 
&tracker));
     FillBatch(no_dup_batch, 12345, 1, -1);
     TRowBatch no_dup_tbatch;
     no_dup_batch->Serialize(&no_dup_tbatch);
 
     RowBatch* adjacent_dup_batch =
-        obj_pool.Add(new RowBatch(row_desc, NUM_ROWS, &tracker));
+        obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
     FillBatch(adjacent_dup_batch, 12345, 5, -1);
     TRowBatch adjacent_dup_tbatch;
     adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false);
 
-    RowBatch* dup_batch =
-        obj_pool.Add(new RowBatch(row_desc, NUM_ROWS, &tracker));
+    RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, 
&tracker));
     // Non-adjacent duplicates.
     FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5);
     TRowBatch dup_tbatch;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/aggregation-node.cc b/be/src/exec/aggregation-node.cc
index 455b1c3..17820ff 100644
--- a/be/src/exec/aggregation-node.cc
+++ b/be/src/exec/aggregation-node.cc
@@ -79,7 +79,7 @@ Status AggregationNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
   DCHECK(output_tuple_desc_ != nullptr);
   RETURN_IF_ERROR(ExecNode::Init(tnode, state));
 
-  const RowDescriptor& row_desc = child(0)->row_desc();
+  const RowDescriptor& row_desc = *child(0)->row_desc();
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc, 
state,
       &grouping_exprs_));
   for (int i = 0; i < grouping_exprs_.size(); ++i) {
@@ -182,7 +182,7 @@ Status AggregationNode::Open(RuntimeState* state) {
     if (VLOG_ROW_IS_ON) {
       for (int i = 0; i < batch.num_rows(); ++i) {
         TupleRow* row = batch.GetRow(i);
-        VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
+        VLOG_ROW << "input row: " << PrintRow(row, *children_[0]->row_desc());
       }
     }
     if (process_row_batch_fn_ != nullptr) {
@@ -244,7 +244,7 @@ Status AggregationNode::GetNext(RuntimeState* state, 
RowBatch* row_batch, bool*
     output_iterator_.Next<false>();
     row->SetTuple(0, output_tuple);
     if (ExecNode::EvalConjuncts(evals, num_conjuncts, row)) {
-      VLOG_ROW << "output row: " << PrintRow(row, row_desc());
+      VLOG_ROW << "output row: " << PrintRow(row, *row_desc());
       row_batch->CommitLastRow();
       ++num_rows_returned_;
       if (ReachedLimit()) break;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/analytic-eval-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/analytic-eval-node.cc 
b/be/src/exec/analytic-eval-node.cc
index 2aabad1..b789188 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -116,7 +116,7 @@ Status AnalyticEvalNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
   for (int i = 0; i < analytic_node.analytic_functions.size(); ++i) {
     AggFn* analytic_fn;
     RETURN_IF_ERROR(AggFn::Create(analytic_node.analytic_functions[i],
-        child(0)->row_desc(), *(intermediate_tuple_desc_->slots()[i]),
+        *child(0)->row_desc(), *(intermediate_tuple_desc_->slots()[i]),
         *(result_tuple_desc_->slots()[i]), state, &analytic_fn));
     analytic_fns_.push_back(analytic_fn);
     DCHECK(!analytic_fn->is_merge());
@@ -134,7 +134,7 @@ Status AnalyticEvalNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
     DCHECK(analytic_node.__isset.buffered_tuple_id);
     DCHECK(buffered_tuple_desc_ != nullptr);
     vector<TTupleId> tuple_ids;
-    tuple_ids.push_back(child(0)->row_desc().tuple_descriptors()[0]->id());
+    tuple_ids.push_back(child(0)->row_desc()->tuple_descriptors()[0]->id());
     tuple_ids.push_back(buffered_tuple_desc_->id());
     RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, 
false));
 
@@ -153,7 +153,7 @@ Status AnalyticEvalNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 Status AnalyticEvalNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(ExecNode::Prepare(state));
-  DCHECK(child(0)->row_desc().IsPrefixOf(row_desc()));
+  DCHECK(child(0)->row_desc()->IsPrefixOf(*row_desc()));
   curr_tuple_pool_.reset(new MemPool(mem_tracker()));
   prev_tuple_pool_.reset(new MemPool(mem_tracker()));
   mem_pool_.reset(new MemPool(mem_tracker()));
@@ -353,8 +353,7 @@ inline Status AnalyticEvalNode::AddRow(int64_t stream_idx, 
TupleRow* row) {
     if (window_.__isset.window_start) {
       VLOG_ROW << id() << " Adding tuple to window at idx=" << stream_idx;
       Tuple* tuple = row->GetTuple(0)->DeepCopy(
-          *child(0)->row_desc().tuple_descriptors()[0],
-          curr_tuple_pool_.get());
+          *child(0)->row_desc()->tuple_descriptors()[0], 
curr_tuple_pool_.get());
       window_tuples_.push_back(pair<int64_t, Tuple*>(stream_idx, tuple));
     }
   }
@@ -719,7 +718,7 @@ Status AnalyticEvalNode::GetNextOutputBatch(
     return Status::OK();
   }
 
-  const int num_child_tuples = child(0)->row_desc().tuple_descriptors().size();
+  const int num_child_tuples = 
child(0)->row_desc()->tuple_descriptors().size();
   RowBatch input_batch(child(0)->row_desc(), output_batch->capacity(), 
mem_tracker());
   int64_t stream_idx = input_stream_->rows_returned();
   RETURN_IF_ERROR(input_stream_->GetNext(&input_batch, eos));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc 
b/be/src/exec/blocking-join-node.cc
index 486c5cc..cd3dd46 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -81,8 +81,8 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
 
   // Validate the row desc layout is what we expect because the current join
   // implementation relies on it to enable some optimizations.
-  int num_left_tuples = child(0)->row_desc().tuple_descriptors().size();
-  int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
+  int num_left_tuples = child(0)->row_desc()->tuple_descriptors().size();
+  int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
 
 #ifndef NDEBUG
   switch (join_op_) {
@@ -90,13 +90,13 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
     case TJoinOp::LEFT_SEMI_JOIN:
     case TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN: {
       // Only return the surviving probe-side tuples.
-      DCHECK(row_desc().Equals(child(0)->row_desc()));
+      DCHECK(row_desc()->Equals(*child(0)->row_desc()));
       break;
     }
     case TJoinOp::RIGHT_ANTI_JOIN:
     case TJoinOp::RIGHT_SEMI_JOIN: {
       // Only return the surviving build-side tuples.
-      DCHECK(row_desc().Equals(child(1)->row_desc()));
+      DCHECK(row_desc()->Equals(*child(1)->row_desc()));
       break;
     }
     default: {
@@ -107,12 +107,12 @@ Status BlockingJoinNode::Prepare(RuntimeState* state) {
       //   result[1] = build[0]
       //   result[2] = build[1]
       for (int i = 0; i < num_left_tuples; ++i) {
-        TupleDescriptor* desc = child(0)->row_desc().tuple_descriptors()[i];
-        DCHECK_EQ(i, row_desc().GetTupleIdx(desc->id()));
+        TupleDescriptor* desc = child(0)->row_desc()->tuple_descriptors()[i];
+        DCHECK_EQ(i, row_desc()->GetTupleIdx(desc->id()));
       }
       for (int i = 0; i < num_build_tuples; ++i) {
-        TupleDescriptor* desc = child(1)->row_desc().tuple_descriptors()[i];
-        DCHECK_EQ(num_left_tuples + i, row_desc().GetTupleIdx(desc->id()));
+        TupleDescriptor* desc = child(1)->row_desc()->tuple_descriptors()[i];
+        DCHECK_EQ(num_left_tuples + i, row_desc()->GetTupleIdx(desc->id()));
       }
       break;
     }
@@ -307,14 +307,14 @@ void BlockingJoinNode::DebugString(int indentation_level, 
stringstream* out) con
 string BlockingJoinNode::GetLeftChildRowString(TupleRow* row) {
   stringstream out;
   out << "[";
-  int num_probe_tuple_rows = child(0)->row_desc().tuple_descriptors().size();
-  for (int i = 0; i < row_desc().tuple_descriptors().size(); ++i) {
+  int num_probe_tuple_rows = child(0)->row_desc()->tuple_descriptors().size();
+  for (int i = 0; i < row_desc()->tuple_descriptors().size(); ++i) {
     if (i != 0) out << " ";
     if (i >= num_probe_tuple_rows) {
       // Build row is not yet populated, print NULL
-      out << PrintTuple(NULL, *row_desc().tuple_descriptors()[i]);
+      out << PrintTuple(NULL, *row_desc()->tuple_descriptors()[i]);
     } else {
-      out << PrintTuple(row->GetTuple(i), *row_desc().tuple_descriptors()[i]);
+      out << PrintTuple(row->GetTuple(i), *row_desc()->tuple_descriptors()[i]);
     }
   }
   out << "]";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/data-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index 437343e..f7697c8 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -41,15 +41,15 @@ using strings::Substitute;
 
 namespace impala {
 
-DataSink::DataSink(const RowDescriptor& row_desc) :
-    closed_(false), row_desc_(row_desc), mem_tracker_(NULL) {}
+DataSink::DataSink(const RowDescriptor* row_desc)
+  : closed_(false), row_desc_(row_desc), mem_tracker_(NULL) {}
 
 DataSink::~DataSink() {
   DCHECK(closed_);
 }
 
 Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
-    const TPlanFragmentInstanceCtx& fragment_instance_ctx, const 
RowDescriptor& row_desc,
+    const TPlanFragmentInstanceCtx& fragment_instance_ctx, const 
RowDescriptor* row_desc,
     RuntimeState* state, DataSink** sink) {
   const TDataSink& thrift_sink = fragment_ctx.fragment.output_sink;
   const vector<TExpr>& thrift_output_exprs = 
fragment_ctx.fragment.output_exprs;
@@ -103,7 +103,7 @@ Status DataSink::Create(const TPlanFragmentCtx& 
fragment_ctx,
 
 Status DataSink::Init(const vector<TExpr>& thrift_output_exprs,
     const TDataSink& tsink, RuntimeState* state) {
-  return ScalarExpr::Create(thrift_output_exprs, row_desc_, state, 
&output_exprs_);
+  return ScalarExpr::Create(thrift_output_exprs, *row_desc_, state, 
&output_exprs_);
 }
 
 void DataSink::MergeDmlStats(const TInsertStats& src_stats,

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/data-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/data-sink.h b/be/src/exec/data-sink.h
index cfd06bb..8e870f9 100644
--- a/be/src/exec/data-sink.h
+++ b/be/src/exec/data-sink.h
@@ -54,7 +54,7 @@ class TInsertStats;
 /// Close() is called to release any resources before destroying the sink.
 class DataSink {
  public:
-  DataSink(const RowDescriptor& row_desc);
+  DataSink(const RowDescriptor* row_desc);
   virtual ~DataSink();
 
   /// Return the name to use in profiles, etc.
@@ -86,7 +86,7 @@ class DataSink {
   /// thrift_sink.
   static Status Create(const TPlanFragmentCtx& fragment_ctx,
       const TPlanFragmentInstanceCtx& fragment_instance_ctx,
-      const RowDescriptor& row_desc, RuntimeState* state, DataSink** sink);
+      const RowDescriptor* row_desc, RuntimeState* state, DataSink** sink);
 
   /// Merges one update to the DML stats for a partition. dst_stats will have 
the
   /// combined stats of src_stats and dst_stats after this method returns.
@@ -109,8 +109,9 @@ class DataSink {
   /// Close().
   bool closed_;
 
-  /// The row descriptor for the rows consumed by the sink. Not owned.
-  const RowDescriptor& row_desc_;
+  /// The row descriptor for the rows consumed by the sink. Owned by root exec 
node of
+  /// plan tree, which feeds into this sink.
+  const RowDescriptor* row_desc_;
 
   /// The runtime profile for this DataSink. Initialized in Prepare(). Not 
owned.
   RuntimeProfile* profile_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 4d32f80..5fe5fb4 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -79,7 +79,7 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);
   stream_recvr_ = ExecEnv::GetInstance()->stream_mgr()->CreateRecvr(state,
-      input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
+      &input_row_desc_, state->fragment_instance_id(), id_, num_senders_,
       FLAGS_exchg_node_buffer_size_bytes, runtime_profile(), is_merging_);
   if (is_merging_) {
     less_than_.reset(
@@ -197,7 +197,7 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* 
output_batch, bool*
     *eos = (input_batch_ == NULL);
     if (*eos) return Status::OK();
     next_row_idx_ = 0;
-    
DCHECK(input_batch_->row_desc().LayoutIsPrefixOf(output_batch->row_desc()));
+    
DCHECK(input_batch_->row_desc()->LayoutIsPrefixOf(*output_batch->row_desc()));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/exec-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index ceb3c49..d64872b 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -43,7 +43,8 @@ class TPlan;
 class TupleRow;
 class TDebugOptions;
 
-/// Superclass of all executor nodes.
+/// Superclass of all execution nodes.
+///
 /// All subclasses need to make sure to check RuntimeState::is_cancelled()
 /// periodically in order to ensure timely termination after the cancellation
 /// flag gets set.
@@ -178,7 +179,13 @@ class ExecNode {
 
   int id() const { return id_; }
   TPlanNodeType::type type() const { return type_; }
-  const RowDescriptor& row_desc() const { return row_descriptor_; }
+
+  /// Returns the row descriptor for rows produced by this node. The 
RowDescriptor is
+  /// constant for the lifetime of the fragment instance, and so is shared by 
reference
+  /// across the plan tree, including in RowBatches. The lifetime of the 
descriptor is the
+  /// same as the lifetime of this node.
+  const RowDescriptor* row_desc() const { return &row_descriptor_; }
+
   ExecNode* child(int i) { return children_[i]; }
   int num_children() const { return children_.size(); }
   SubplanNode* get_containing_subplan() const { return containing_subplan_; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hash-join-node.cc b/be/src/exec/hash-join-node.cc
index 35202a2..deee7f2 100644
--- a/be/src/exec/hash-join-node.cc
+++ b/be/src/exec/hash-join-node.cc
@@ -76,12 +76,12 @@ Status HashJoinNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
 
   for (int i = 0; i < eq_join_conjuncts.size(); ++i) {
     ScalarExpr* probe_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjuncts[i].left, 
child(0)->row_desc(),
-        state, &probe_expr));
+    RETURN_IF_ERROR(ScalarExpr::Create(
+        eq_join_conjuncts[i].left, *child(0)->row_desc(), state, &probe_expr));
     probe_exprs_.push_back(probe_expr);
     ScalarExpr* build_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjuncts[i].right, 
child(1)->row_desc(),
-        state, &build_expr));
+    RETURN_IF_ERROR(ScalarExpr::Create(
+        eq_join_conjuncts[i].right, *child(1)->row_desc(), state, 
&build_expr));
     build_exprs_.push_back(build_expr);
     is_not_distinct_from_.push_back(eq_join_conjuncts[i].is_not_distinct_from);
   }
@@ -89,7 +89,7 @@ Status HashJoinNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
   // other_join_conjunct_evals_ are evaluated in the context of rows assembled 
from
   // all build and probe tuples; full_row_desc is not necessarily the same as 
the output
   // row desc, e.g., because semi joins only return the build xor probe tuples
-  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
+  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
       full_row_desc, state, &other_join_conjuncts_));
 
@@ -107,7 +107,7 @@ Status HashJoinNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
     filters_.push_back(state->filter_bank()->RegisterFilter(tfilter, true));
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(tfilter.src_expr, child(1)->row_desc(), state, 
&filter_expr));
+        ScalarExpr::Create(tfilter.src_expr, *child(1)->row_desc(), state, 
&filter_expr));
     filter_exprs_.push_back(filter_expr);
   }
   return Status::OK();
@@ -135,7 +135,7 @@ Status HashJoinNode::Prepare(RuntimeState* state) {
                                 false, std::logical_or<bool>());
 
   RETURN_IF_ERROR(OldHashTable::Create(pool_, state, build_exprs_, 
probe_exprs_,
-      filter_exprs_, child(1)->row_desc().tuple_descriptors().size(), 
stores_nulls,
+      filter_exprs_, child(1)->row_desc()->tuple_descriptors().size(), 
stores_nulls,
       is_not_distinct_from_, state->fragment_hash_seed(), mem_tracker(), 
filters_,
       &hash_tbl_));
   build_pool_.reset(new MemPool(mem_tracker()));
@@ -246,7 +246,7 @@ Status HashJoinNode::ProcessBuildInput(RuntimeState* state) 
{
     } else {
       process_build_batch_fn_(this, &build_batch);
     }
-    VLOG_ROW << hash_tbl_->DebugString(true, false, &child(1)->row_desc());
+    VLOG_ROW << hash_tbl_->DebugString(true, false, child(1)->row_desc());
 
     COUNTER_SET(build_row_counter_, hash_tbl_->size());
     COUNTER_SET(build_buckets_counter_, hash_tbl_->num_buckets());
@@ -340,7 +340,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* 
out_batch, bool* eos
       hash_tbl_iterator_.Next<true>();
       if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
         out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
+        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
         ++num_rows_returned_;
         COUNTER_SET(rows_returned_counter_, num_rows_returned_);
         if (out_batch->AtCapacity() || ReachedLimit()) {
@@ -359,7 +359,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* 
out_batch, bool* eos
       CreateOutputRow(out_row, current_probe_row_, NULL);
       if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
         out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
+        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
         ++num_rows_returned_;
         COUNTER_SET(rows_returned_counter_, num_rows_returned_);
         matched_probe_ = true;
@@ -429,7 +429,7 @@ Status HashJoinNode::GetNext(RuntimeState* state, RowBatch* 
out_batch, bool* eos
       CreateOutputRow(out_row, NULL, build_row);
       if (EvalConjuncts(conjunct_evals_.data(), num_conjuncts, out_row)) {
         out_batch->CommitLastRow();
-        VLOG_ROW << "match row: " << PrintRow(out_row, row_desc());
+        VLOG_ROW << "match row: " << PrintRow(out_row, *row_desc());
         ++num_rows_returned_;
         COUNTER_SET(rows_returned_counter_, num_rows_returned_);
         if (ReachedLimit()) {
@@ -556,8 +556,8 @@ Function* HashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* 
codegen) {
   Value* probe_row_arg = builder.CreateBitCast(args[2], 
tuple_row_working_type, "probe");
   Value* build_row_arg = builder.CreateBitCast(args[3], 
tuple_row_working_type, "build");
 
-  int num_probe_tuples = child(0)->row_desc().tuple_descriptors().size();
-  int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
+  int num_probe_tuples = child(0)->row_desc()->tuple_descriptors().size();
+  int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
 
   // Copy probe row
   codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, 
probe_tuple_row_size_);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hbase-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc
index 8f9c64f..c957793 100644
--- a/be/src/exec/hbase-table-sink.cc
+++ b/be/src/exec/hbase-table-sink.cc
@@ -34,7 +34,7 @@ namespace impala {
 const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
-HBaseTableSink::HBaseTableSink(const RowDescriptor& row_desc, const TDataSink& 
tsink)
+HBaseTableSink::HBaseTableSink(const RowDescriptor* row_desc, const TDataSink& 
tsink)
   : DataSink(row_desc),
     table_id_(tsink.table_sink.target_table_id),
     table_desc_(NULL),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hbase-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hbase-table-sink.h b/be/src/exec/hbase-table-sink.h
index 714c90b..469621c 100644
--- a/be/src/exec/hbase-table-sink.h
+++ b/be/src/exec/hbase-table-sink.h
@@ -35,7 +35,7 @@ namespace impala {
 /// eventually be written into an HBase table.
 class HBaseTableSink : public DataSink {
  public:
-  HBaseTableSink(const RowDescriptor& row_desc, const TDataSink& tsink);
+  HBaseTableSink(const RowDescriptor* row_desc, const TDataSink& tsink);
   virtual std::string GetName() { return "HBaseTableSink"; }
   virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker);
   virtual Status Send(RuntimeState* state, RowBatch* batch);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc 
b/be/src/exec/hdfs-parquet-scanner.cc
index a94359b..6d4b212 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -154,24 +154,24 @@ DiskIoMgr::ScanRange* 
HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) {
 namespace impala {
 
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
-    : HdfsScanner(scan_node, state),
-      row_group_idx_(-1),
-      row_group_rows_read_(0),
-      advance_row_group_(true),
-      min_max_tuple_buffer_(scan_node->mem_tracker()),
-      row_batches_produced_(0),
-      scratch_batch_(new ScratchTupleBatch(
-          scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
-      metadata_range_(NULL),
-      dictionary_pool_(new MemPool(scan_node->mem_tracker())),
-      dict_filter_tuple_backing_(scan_node->mem_tracker()),
-      assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
-      process_footer_timer_stats_(NULL),
-      num_cols_counter_(NULL),
-      num_row_groups_counter_(NULL),
-      num_scanners_with_no_reads_counter_(NULL),
-      num_dict_filtered_row_groups_counter_(NULL),
-      codegend_process_scratch_batch_fn_(NULL) {
+  : HdfsScanner(scan_node, state),
+    row_group_idx_(-1),
+    row_group_rows_read_(0),
+    advance_row_group_(true),
+    min_max_tuple_buffer_(scan_node->mem_tracker()),
+    row_batches_produced_(0),
+    scratch_batch_(new ScratchTupleBatch(
+        *scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
+    metadata_range_(NULL),
+    dictionary_pool_(new MemPool(scan_node->mem_tracker())),
+    dict_filter_tuple_backing_(scan_node->mem_tracker()),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
+    process_footer_timer_stats_(NULL),
+    num_cols_counter_(NULL),
+    num_row_groups_counter_(NULL),
+    num_scanners_with_no_reads_counter_(NULL),
+    num_dict_filtered_row_groups_counter_(NULL),
+    codegend_process_scratch_batch_fn_(NULL) {
   assemble_rows_timer_.Stop();
 }
 
@@ -1006,7 +1006,7 @@ int HdfsParquetScanner::TransferScratchTuples(RowBatch* 
dst_batch) {
   // never be empty.
   DCHECK_LT(dst_batch->num_rows(), dst_batch->capacity());
   DCHECK_EQ(scan_node_->tuple_idx(), 0);
-  DCHECK_EQ(dst_batch->row_desc().tuple_descriptors().size(), 1);
+  DCHECK_EQ(dst_batch->row_desc()->tuple_descriptors().size(), 1);
   if (scratch_batch_->tuple_byte_size == 0) {
     Tuple** output_row =
         reinterpret_cast<Tuple**>(dst_batch->GetRow(dst_batch->num_rows()));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index eeb7cfe..32b7372 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -128,7 +128,7 @@ Status HdfsScanNodeBase::Init(const TPlanNode& tnode, 
RuntimeState* state) {
     }
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(target.target_expr, row_desc(), state, 
&filter_expr));
+        ScalarExpr::Create(target.target_expr, *row_desc(), state, 
&filter_expr));
     filter_exprs_.push_back(filter_expr);
 
     // TODO: Move this to Prepare()
@@ -732,9 +732,9 @@ void HdfsScanNodeBase::InitNullCollectionValues(const 
TupleDescriptor* tuple_des
 }
 
 void HdfsScanNodeBase::InitNullCollectionValues(RowBatch* row_batch) const {
-  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
+  DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
   const TupleDescriptor& tuple_desc =
-      *row_batch->row_desc().tuple_descriptors()[tuple_idx()];
+      *row_batch->row_desc()->tuple_descriptors()[tuple_idx()];
   if (tuple_desc.collection_slots().empty()) return;
   for (int i = 0; i < row_batch->num_rows(); ++i) {
     Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hdfs-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc
index e38439b..e488717 100644
--- a/be/src/exec/hdfs-table-sink.cc
+++ b/be/src/exec/hdfs-table-sink.cc
@@ -56,7 +56,7 @@ namespace impala {
 const static string& ROOT_PARTITION_KEY =
     g_ImpalaInternalService_constants.ROOT_PARTITION_KEY;
 
-HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& 
tsink)
+HdfsTableSink::HdfsTableSink(const RowDescriptor* row_desc, const TDataSink& 
tsink)
   : DataSink(row_desc),
     table_desc_(nullptr),
     default_partition_(nullptr),
@@ -85,7 +85,7 @@ Status HdfsTableSink::Init(const vector<TExpr>& 
thrift_output_exprs,
   RETURN_IF_ERROR(DataSink::Init(thrift_output_exprs, tsink, state));
   DCHECK(tsink.__isset.table_sink);
   
RETURN_IF_ERROR(ScalarExpr::Create(tsink.table_sink.hdfs_table_sink.partition_key_exprs,
-      row_desc_, state, &partition_key_exprs_));
+      *row_desc_, state, &partition_key_exprs_));
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/hdfs-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h
index 28581f9..cece5bf 100644
--- a/be/src/exec/hdfs-table-sink.h
+++ b/be/src/exec/hdfs-table-sink.h
@@ -127,7 +127,7 @@ struct OutputPartition {
 /// This is consistent with Hive's behavior.
 class HdfsTableSink : public DataSink {
  public:
-  HdfsTableSink(const RowDescriptor& row_desc, const TDataSink& tsink);
+  HdfsTableSink(const RowDescriptor* row_desc, const TDataSink& tsink);
 
   virtual std::string GetName() { return "HdfsTableSink"; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/kudu-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scanner.cc b/be/src/exec/kudu-scanner.cc
index 3fb0a18..514ebc4 100644
--- a/be/src/exec/kudu-scanner.cc
+++ b/be/src/exec/kudu-scanner.cc
@@ -192,9 +192,9 @@ Status KuduScanner::DecodeRowsIntoRowBatch(RowBatch* 
row_batch, Tuple** tuple_me
   int num_rows = cur_kudu_batch_.NumRows();
 
   for (int krow_idx = cur_kudu_batch_num_read_; krow_idx < num_rows; 
++krow_idx) {
-    Tuple* kudu_tuple = const_cast<Tuple*>(reinterpret_cast<const Tuple*>(
-        cur_kudu_batch_.direct_data().data() +
-        (krow_idx * scan_node_->row_desc().GetRowSize())));
+    Tuple* kudu_tuple = const_cast<Tuple*>(
+        reinterpret_cast<const Tuple*>(cur_kudu_batch_.direct_data().data()
+            + (krow_idx * scan_node_->row_desc()->GetRowSize())));
     ++cur_kudu_batch_num_read_;
 
     // Kudu tuples containing TIMESTAMP columns (UNIXTIME_MICROS in Kudu, 
stored as an

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 32aab79..bfc34a6 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -71,11 +71,11 @@ const static string& ROOT_PARTITION_KEY =
 // Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693).
 const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024;
 
-KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const TDataSink& 
tsink)
-    : DataSink(row_desc),
-      table_id_(tsink.table_sink.target_table_id),
-      sink_action_(tsink.table_sink.action),
-      kudu_table_sink_(tsink.table_sink.kudu_table_sink) {
+KuduTableSink::KuduTableSink(const RowDescriptor* row_desc, const TDataSink& 
tsink)
+  : DataSink(row_desc),
+    table_id_(tsink.table_sink.target_table_id),
+    sink_action_(tsink.table_sink.action),
+    kudu_table_sink_(tsink.table_sink.kudu_table_sink) {
   DCHECK(tsink.__isset.table_sink);
   DCHECK(KuduIsAvailable());
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/kudu-table-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h
index dcf5241..23e7033 100644
--- a/be/src/exec/kudu-table-sink.h
+++ b/be/src/exec/kudu-table-sink.h
@@ -53,7 +53,7 @@ namespace impala {
 /// status. All reported errors (ignored or not) will be logged via the 
RuntimeState.
 class KuduTableSink : public DataSink {
  public:
-  KuduTableSink(const RowDescriptor& row_desc, const TDataSink& tsink);
+  KuduTableSink(const RowDescriptor* row_desc, const TDataSink& tsink);
 
   virtual std::string GetName() { return "KuduTableSink"; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/nested-loop-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.cc 
b/be/src/exec/nested-loop-join-builder.cc
index 86848f5..67e6ed6 100644
--- a/be/src/exec/nested-loop-join-builder.cc
+++ b/be/src/exec/nested-loop-join-builder.cc
@@ -27,7 +27,7 @@
 
 using namespace impala;
 
-NljBuilder::NljBuilder(const RowDescriptor& row_desc, RuntimeState* state)
+NljBuilder::NljBuilder(const RowDescriptor* row_desc, RuntimeState* state)
   : DataSink(row_desc), build_batch_cache_(row_desc, state->batch_size()) {}
 
 Status NljBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker) {
@@ -105,4 +105,3 @@ Status NljBuilder::DeepCopyBuildBatches(RuntimeState* 
state) {
   input_build_batches_.Reset();
   return Status::OK();
 }
-

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/nested-loop-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-builder.h 
b/be/src/exec/nested-loop-join-builder.h
index 86f497b..6e60899 100644
--- a/be/src/exec/nested-loop-join-builder.h
+++ b/be/src/exec/nested-loop-join-builder.h
@@ -37,7 +37,7 @@ namespace impala {
 /// is used and all data is deep copied into memory owned by the builder.
 class NljBuilder : public DataSink {
  public:
-  NljBuilder(const RowDescriptor& row_desc, RuntimeState* state);
+  NljBuilder(const RowDescriptor* row_desc, RuntimeState* state);
 
   /// Implementations of DataSink interface methods.
   virtual std::string GetName() override { return "Nested Loop Join Builder"; }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/nested-loop-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/nested-loop-join-node.cc 
b/be/src/exec/nested-loop-join-node.cc
index 14d9ac8..01a7f19 100644
--- a/be/src/exec/nested-loop-join-node.cc
+++ b/be/src/exec/nested-loop-join-node.cc
@@ -53,7 +53,7 @@ Status NestedLoopJoinNode::Init(const TPlanNode& tnode, 
RuntimeState* state) {
   DCHECK(tnode.__isset.nested_loop_join_node);
   // join_conjunct_evals_ are evaluated in the context of rows assembled from
   // all inner and outer tuples.
-  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
+  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
   
RETURN_IF_ERROR(ScalarExpr::Create(tnode.nested_loop_join_node.join_conjuncts,
       full_row_desc, state, &join_conjuncts_));
   DCHECK(tnode.nested_loop_join_node.join_op != TJoinOp::CROSS_JOIN ||
@@ -316,7 +316,7 @@ Status 
NestedLoopJoinNode::GetNextLeftSemiJoin(RuntimeState* state,
       // A match is found. Create the output row from the probe row.
       TupleRow* output_row = output_batch->GetRow(output_batch->AddRow());
       output_batch->CopyRow(current_probe_row_, output_row);
-      VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
+      VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
       if (ReachedLimit()) {
@@ -433,7 +433,7 @@ Status 
NestedLoopJoinNode::GetNextRightSemiJoin(RuntimeState* state,
       output_batch->CopyRow(build_row_iterator_.GetRow(), output_row);
       build_row_iterator_.Next();
       ++current_build_row_idx_;
-      VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
+      VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
       if (output_batch->AtCapacity()) {
@@ -521,7 +521,7 @@ Status 
NestedLoopJoinNode::ProcessUnmatchedProbeRow(RuntimeState* state,
   }
   // Evaluate all the other (non-join) conjuncts.
   if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
-    VLOG_ROW << "match row:" << PrintRow(output_row, row_desc());
+    VLOG_ROW << "match row:" << PrintRow(output_row, *row_desc());
     output_batch->CommitLastRow();
     ++num_rows_returned_;
     COUNTER_ADD(rows_returned_counter_, 1);
@@ -576,7 +576,7 @@ Status NestedLoopJoinNode::ProcessUnmatchedBuildRows(
     // Evaluate conjuncts that don't affect the matching rows of the join on 
the
     // result row.
     if (EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) {
-      VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
+      VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
       output_batch->CommitLastRow();
       ++num_rows_returned_;
       if (output_batch->AtCapacity()) {
@@ -628,7 +628,7 @@ Status NestedLoopJoinNode::FindBuildMatches(
       matching_build_rows_->Set(current_build_row_idx_ - 1, true);
     }
     if (!EvalConjuncts(conjunct_evals, num_conjuncts, output_row)) continue;
-    VLOG_ROW << "match row: " << PrintRow(output_row, row_desc());
+    VLOG_ROW << "match row: " << PrintRow(output_row, *row_desc());
     output_batch->CommitLastRow();
     ++num_rows_returned_;
     if (output_batch->AtCapacity()) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/partitioned-aggregation-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.cc 
b/be/src/exec/partitioned-aggregation-node.cc
index 20ee5e9..83ebbc2 100644
--- a/be/src/exec/partitioned-aggregation-node.cc
+++ b/be/src/exec/partitioned-aggregation-node.cc
@@ -105,7 +105,7 @@ PartitionedAggregationNode::PartitionedAggregationNode(
   : ExecNode(pool, tnode, descs),
     intermediate_tuple_id_(tnode.agg_node.intermediate_tuple_id),
     intermediate_tuple_desc_(descs.GetTupleDescriptor(intermediate_tuple_id_)),
-    intermediate_row_desc_(pool->Add(new 
RowDescriptor(intermediate_tuple_desc_, false))),
+    intermediate_row_desc_(intermediate_tuple_desc_, false),
     output_tuple_id_(tnode.agg_node.output_tuple_id),
     output_tuple_desc_(descs.GetTupleDescriptor(output_tuple_id_)),
     needs_finalize_(tnode.agg_node.need_finalize),
@@ -150,7 +150,7 @@ Status PartitionedAggregationNode::Init(const TPlanNode& 
tnode, RuntimeState* st
   DCHECK(intermediate_tuple_desc_ != nullptr);
   DCHECK(output_tuple_desc_ != nullptr);
   DCHECK_EQ(intermediate_tuple_desc_->slots().size(), 
output_tuple_desc_->slots().size());
-  const RowDescriptor& row_desc = child(0)->row_desc();
+  const RowDescriptor& row_desc = *child(0)->row_desc();
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.agg_node.grouping_exprs, row_desc,
       state, &grouping_exprs_));
 
@@ -162,7 +162,7 @@ Status PartitionedAggregationNode::Init(const TPlanNode& 
tnode, RuntimeState* st
     SlotRef* build_expr = pool_->Add(desc->type().type != TYPE_NULL ?
         new SlotRef(desc) : new SlotRef(desc, TYPE_BOOLEAN));
     build_exprs_.push_back(build_expr);
-    RETURN_IF_ERROR(build_expr->Init(*intermediate_row_desc_, state));
+    RETURN_IF_ERROR(build_expr->Init(intermediate_row_desc_, state));
     if (build_expr->type().IsVarLenStringType()) 
string_grouping_exprs_.push_back(i);
   }
 
@@ -232,7 +232,7 @@ Status PartitionedAggregationNode::Prepare(RuntimeState* 
state) {
   // TODO: Is there a need to create the stream here? If memory reservations 
work we may
   // be able to create this stream lazily and only whenever we need to spill.
   if (!is_streaming_preagg_ && needs_serialize_ && block_mgr_client_ != NULL) {
-    serialize_stream_.reset(new BufferedTupleStream(state, 
*intermediate_row_desc_,
+    serialize_stream_.reset(new BufferedTupleStream(state, 
&intermediate_row_desc_,
         state->block_mgr(), block_mgr_client_, false /* 
use_initial_small_buffers */,
         false /* read_write */));
     RETURN_IF_ERROR(serialize_stream_->Init(id(), runtime_profile(), false));
@@ -294,7 +294,7 @@ Status PartitionedAggregationNode::Open(RuntimeState* 
state) {
     if (UNLIKELY(VLOG_ROW_IS_ON)) {
       for (int i = 0; i < batch.num_rows(); ++i) {
         TupleRow* row = batch.GetRow(i);
-        VLOG_ROW << "input row: " << PrintRow(row, children_[0]->row_desc());
+        VLOG_ROW << "input row: " << PrintRow(row, *children_[0]->row_desc());
       }
     }
 
@@ -369,7 +369,7 @@ Status 
PartitionedAggregationNode::HandleOutputStrings(RowBatch* row_batch,
 Status PartitionedAggregationNode::CopyStringData(const SlotDescriptor& 
slot_desc,
     RowBatch* row_batch, int first_row_idx, MemPool* pool) {
   DCHECK(slot_desc.type().IsVarLenStringType());
-  DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1);
+  DCHECK_EQ(row_batch->row_desc()->tuple_descriptors().size(), 1);
   FOREACH_ROW(row_batch, first_row_idx, batch_iter) {
     Tuple* tuple = batch_iter.Get()->GetTuple(0);
     StringValue* sv = reinterpret_cast<StringValue*>(
@@ -705,7 +705,7 @@ Status PartitionedAggregationNode::Partition::InitStreams() 
{
   }
 
   aggregated_row_stream.reset(new BufferedTupleStream(parent->state_,
-      *parent->intermediate_row_desc_, parent->state_->block_mgr(),
+      &parent->intermediate_row_desc_, parent->state_->block_mgr(),
       parent->block_mgr_client_, true /* use_initial_small_buffers */,
       false /* read_write */, external_varlen_slots));
   RETURN_IF_ERROR(
@@ -803,10 +803,10 @@ Status 
PartitionedAggregationNode::Partition::SerializeStreamForSpilling() {
     // when we need to spill again. We need to have this available before we 
need
     // to spill to make sure it is available. This should be acquirable since 
we just
     // freed at least one buffer from this partition's (old) 
aggregated_row_stream.
-    parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_,
-        *parent->intermediate_row_desc_, parent->state_->block_mgr(),
-        parent->block_mgr_client_, false /* use_initial_small_buffers */,
-        false /* read_write */));
+    parent->serialize_stream_.reset(
+        new BufferedTupleStream(parent->state_, 
&parent->intermediate_row_desc_,
+            parent->state_->block_mgr(), parent->block_mgr_client_,
+            false /* use_initial_small_buffers */, false /* read_write */));
     status = parent->serialize_stream_->Init(parent->id(), 
parent->runtime_profile(),
         false);
     if (status.ok()) {
@@ -1272,8 +1272,9 @@ Status 
PartitionedAggregationNode::ProcessStream(BufferedTupleStream* input_stre
 
     TPrefetchMode::type prefetch_mode = state_->query_options().prefetch_mode;
     bool eos = false;
-    RowBatch batch(AGGREGATED_ROWS ? *intermediate_row_desc_ : 
children_[0]->row_desc(),
-                   state_->batch_size(), mem_tracker());
+    const RowDescriptor* desc =
+        AGGREGATED_ROWS ? &intermediate_row_desc_ : children_[0]->row_desc();
+    RowBatch batch(desc, state_->batch_size(), mem_tracker());
     do {
       RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
       RETURN_IF_ERROR(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/partitioned-aggregation-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-aggregation-node.h 
b/be/src/exec/partitioned-aggregation-node.h
index ccac45b..066dc28 100644
--- a/be/src/exec/partitioned-aggregation-node.h
+++ b/be/src/exec/partitioned-aggregation-node.h
@@ -185,7 +185,7 @@ class PartitionedAggregationNode : public ExecNode {
   /// nor this node's output row desc may contain the intermediate tuple, e.g.,
   /// in a single-node plan with an intermediate tuple different from the 
output tuple.
   /// Lives in the query state's obj_pool.
-  RowDescriptor* intermediate_row_desc_;
+  RowDescriptor intermediate_row_desc_;
 
   /// Tuple into which Finalize() results are stored. Possibly the same as
   /// the intermediate tuple.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/partitioned-hash-join-builder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.cc 
b/be/src/exec/partitioned-hash-join-builder.cc
index a17e295..a214f74 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -49,7 +49,7 @@ using namespace strings;
 using std::unique_ptr;
 
 PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
-    const RowDescriptor& probe_row_desc, const RowDescriptor& build_row_desc,
+    const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
     RuntimeState* state)
   : DataSink(build_row_desc),
     runtime_state_(state),
@@ -86,7 +86,7 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ScalarExpr* build_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(eq_join_conjunct.right, row_desc_, state, 
&build_expr));
+        ScalarExpr::Create(eq_join_conjunct.right, *row_desc_, state, 
&build_expr));
     build_exprs_.push_back(build_expr);
     is_not_distinct_from_.push_back(eq_join_conjunct.is_not_distinct_from);
   }
@@ -104,7 +104,7 @@ Status PhjBuilder::InitExprsAndFilters(RuntimeState* state,
     }
     ScalarExpr* filter_expr;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(filter_desc.src_expr, row_desc_, state, 
&filter_expr));
+        ScalarExpr::Create(filter_desc.src_expr, *row_desc_, state, 
&filter_expr));
     filter_exprs_.push_back(filter_expr);
 
     // TODO: Move to Prepare().
@@ -129,7 +129,7 @@ Status PhjBuilder::Prepare(RuntimeState* state, MemTracker* 
parent_mem_tracker)
   RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
   RETURN_IF_ERROR(HashTableCtx::Create(&pool_, state, build_exprs_, 
build_exprs_,
       HashTableStoresNulls(), is_not_distinct_from_, 
state->fragment_hash_seed(),
-      MAX_PARTITION_DEPTH, row_desc_.tuple_descriptors().size(), 
expr_mem_pool(),
+      MAX_PARTITION_DEPTH, row_desc_->tuple_descriptors().size(), 
expr_mem_pool(),
       &ht_ctx_));
 
   DCHECK_EQ(filter_exprs_.size(), filter_ctxs_.size());
@@ -678,7 +678,7 @@ Status PhjBuilder::Partition::BuildHashTable(bool* built) {
       HashTable::EstimateNumBuckets(build_rows()->num_rows()) :
       state->batch_size() * 2;
   hash_tbl_.reset(HashTable::Create(state, parent_->block_mgr_client_,
-      true /* store_duplicates */, 
parent_->row_desc_.tuple_descriptors().size(),
+      true /* store_duplicates */, 
parent_->row_desc_->tuple_descriptors().size(),
       build_rows(), 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets));
   if (!hash_tbl_->Init()) goto not_built;
 
@@ -796,7 +796,7 @@ Status PhjBuilder::CodegenProcessBuildBatch(LlvmCodeGen* 
codegen,
   // Replace some hash table parameters with constants.
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
-  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  const int num_build_tuples = row_desc_->tuple_descriptors().size();
   RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, 
stores_duplicates,
       num_build_tuples, process_build_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);
@@ -878,7 +878,7 @@ Status PhjBuilder::CodegenInsertBatch(LlvmCodeGen* codegen, 
Function* hash_fn,
   // Replace hash-table parameters with constants.
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
-  const int num_build_tuples = row_desc_.tuple_descriptors().size();
+  const int num_build_tuples = row_desc_->tuple_descriptors().size();
   RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, 
stores_duplicates,
       num_build_tuples, insert_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/partitioned-hash-join-builder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-builder.h 
b/be/src/exec/partitioned-hash-join-builder.h
index f7c1f88..daa3969 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -71,8 +71,8 @@ class PhjBuilder : public DataSink {
  public:
   class Partition;
 
-  PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor& 
probe_row_desc,
-      const RowDescriptor& build_row_desc, RuntimeState* state);
+  PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* 
probe_row_desc,
+      const RowDescriptor* build_row_desc, RuntimeState* state);
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -351,7 +351,7 @@ class PhjBuilder : public DataSink {
   const TJoinOp::type join_op_;
 
   /// Descriptor for the probe rows, needed to initialize probe streams.
-  const RowDescriptor& probe_row_desc_;
+  const RowDescriptor* probe_row_desc_;
 
   /// Pool for objects with same lifetime as builder.
   ObjectPool pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/partitioned-hash-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/partitioned-hash-join-node.cc 
b/be/src/exec/partitioned-hash-join-node.cc
index d36aa71..2751145 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -84,18 +84,18 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& 
tnode, RuntimeState* state
 
   for (const TEqJoinCondition& eq_join_conjunct : eq_join_conjuncts) {
     ScalarExpr* probe_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.left, 
child(0)->row_desc(),
-        state, &probe_expr));
+    RETURN_IF_ERROR(ScalarExpr::Create(
+        eq_join_conjunct.left, *child(0)->row_desc(), state, &probe_expr));
     probe_exprs_.push_back(probe_expr);
     ScalarExpr* build_expr;
-    RETURN_IF_ERROR(ScalarExpr::Create(eq_join_conjunct.right, 
child(1)->row_desc(),
-        state, &build_expr));
+    RETURN_IF_ERROR(ScalarExpr::Create(
+        eq_join_conjunct.right, *child(1)->row_desc(), state, &build_expr));
     build_exprs_.push_back(build_expr);
   }
   // other_join_conjuncts_ are evaluated in the context of rows assembled from 
all build
   // and probe tuples; full_row_desc is not necessarily the same as the output 
row desc,
   // e.g., because semi joins only return the build xor probe tuples
-  RowDescriptor full_row_desc(child(0)->row_desc(), child(1)->row_desc());
+  RowDescriptor full_row_desc(*child(0)->row_desc(), *child(1)->row_desc());
   RETURN_IF_ERROR(ScalarExpr::Create(tnode.hash_join_node.other_join_conjuncts,
       full_row_desc, state, &other_join_conjuncts_));
   DCHECK(join_op_ != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || 
eq_join_conjuncts.size() == 1);
@@ -118,7 +118,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* 
state) {
   RETURN_IF_ERROR(HashTableCtx::Create(pool_, state, build_exprs_, 
probe_exprs_,
       builder_->HashTableStoresNulls(), builder_->is_not_distinct_from(),
       state->fragment_hash_seed(), MAX_PARTITION_DEPTH,
-      child(1)->row_desc().tuple_descriptors().size(), expr_mem_pool(), 
&ht_ctx_));
+      child(1)->row_desc()->tuple_descriptors().size(), expr_mem_pool(), 
&ht_ctx_));
   if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
     null_aware_eval_timer_ = ADD_TIMER(runtime_profile(), 
"NullAwareAntiJoinEvalTime");
   }
@@ -1248,8 +1248,8 @@ Status 
PartitionedHashJoinNode::CodegenCreateOutputRow(LlvmCodeGen* codegen,
   Value* probe_row_arg = builder.CreateBitCast(args[2], 
tuple_row_working_type, "probe");
   Value* build_row_arg = builder.CreateBitCast(args[3], 
tuple_row_working_type, "build");
 
-  int num_probe_tuples = child(0)->row_desc().tuple_descriptors().size();
-  int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
+  int num_probe_tuples = child(0)->row_desc()->tuple_descriptors().size();
+  int num_build_tuples = child(1)->row_desc()->tuple_descriptors().size();
 
   // Copy probe row
   codegen->CodegenMemcpy(&builder, out_row_arg, probe_row_arg, 
probe_tuple_row_size_);
@@ -1421,7 +1421,7 @@ Status PartitionedHashJoinNode::CodegenProcessProbeBatch(
   // Replace hash-table parameters with constants.
   HashTableCtx::HashTableReplacedConstants replaced_constants;
   const bool stores_duplicates = true;
-  const int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
+  const int num_build_tuples = 
child(1)->row_desc()->tuple_descriptors().size();
   RETURN_IF_ERROR(ht_ctx_->ReplaceHashTableConstants(codegen, 
stores_duplicates,
       num_build_tuples, process_probe_batch_fn, &replaced_constants));
   DCHECK_GE(replaced_constants.stores_nulls, 1);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/plan-root-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index eeece0f..1cba3d5 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -34,8 +34,7 @@ namespace impala {
 
 const string PlanRootSink::NAME = "PLAN_ROOT_SINK";
 
-PlanRootSink::PlanRootSink(const RowDescriptor& row_desc)
-  : DataSink(row_desc) { }
+PlanRootSink::PlanRootSink(const RowDescriptor* row_desc) : DataSink(row_desc) 
{}
 
 namespace {
 
@@ -66,7 +65,7 @@ void ValidateCollectionSlots(const RowDescriptor& row_desc, 
RowBatch* batch) {
 }
 
 Status PlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
-  ValidateCollectionSlots(row_desc_, batch);
+  ValidateCollectionSlots(*row_desc_, batch);
   int current_batch_row = 0;
 
   // Don't enter the loop if batch->num_rows() == 0; no point triggering the 
consumer with

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/plan-root-sink.h
----------------------------------------------------------------------
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index c45d327..654bd27 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -57,7 +57,7 @@ class ScalarExprEvaluator;
 /// and consumer. See IMPALA-4268.
 class PlanRootSink : public DataSink {
  public:
-  PlanRootSink(const RowDescriptor& row_desc);
+  PlanRootSink(const RowDescriptor* row_desc);
 
   virtual std::string GetName() { return NAME; }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/row-batch-cache.h
----------------------------------------------------------------------
diff --git a/be/src/exec/row-batch-cache.h b/be/src/exec/row-batch-cache.h
index 83d0ae9..ceb0260 100644
--- a/be/src/exec/row-batch-cache.h
+++ b/be/src/exec/row-batch-cache.h
@@ -34,7 +34,7 @@ class MemTracker;
 /// Simple cache of row batches.
 class RowBatchCache {
  public:
-  RowBatchCache(const RowDescriptor& row_desc, int batch_size)
+  RowBatchCache(const RowDescriptor* row_desc, int batch_size)
     : row_desc_(row_desc), batch_size_(batch_size), next_row_batch_idx_(0) {}
 
   ~RowBatchCache() { DCHECK_EQ(0, row_batches_.size()); }
@@ -72,7 +72,7 @@ class RowBatchCache {
 
  private:
   /// Parameters needed for creating row batches.
-  const RowDescriptor& row_desc_;
+  const RowDescriptor* row_desc_;
   int batch_size_;
 
   /// List of cached row-batch objects. The row-batch objects are owned by 
this cache.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/row-batch-list-test.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/row-batch-list-test.cc 
b/be/src/exec/row-batch-list-test.cc
index ad3317b..22aa97a 100644
--- a/be/src/exec/row-batch-list-test.cc
+++ b/be/src/exec/row-batch-list-test.cc
@@ -63,7 +63,7 @@ class RowBatchListTest : public testing::Test {
 
   RowBatch* CreateRowBatch(int start, int end) {
     int num_rows = end - start + 1;
-    RowBatch* batch = pool_.Add(new RowBatch(*desc_, num_rows, &tracker_));
+    RowBatch* batch = pool_.Add(new RowBatch(desc_, num_rows, &tracker_));
     int32_t* tuple_mem = reinterpret_cast<int32_t*>(
         batch->tuple_data_pool()->Allocate(sizeof(int32_t) * num_rows));
 
@@ -113,7 +113,7 @@ TEST_F(RowBatchListTest, BasicTest) {
 TEST_F(RowBatchListTest, EmptyBatchTest) {
   const int ALLOC_SIZE = 128;
   RowBatchList row_list;
-  RowBatch* batch1 = pool_.Add(new RowBatch(*desc_, 1, &tracker_));
+  RowBatch* batch1 = pool_.Add(new RowBatch(desc_, 1, &tracker_));
   batch1->tuple_data_pool()->Allocate(ALLOC_SIZE);
   DCHECK_EQ(ALLOC_SIZE, batch1->tuple_data_pool()->total_allocated_bytes());
 
@@ -123,7 +123,7 @@ TEST_F(RowBatchListTest, EmptyBatchTest) {
   EXPECT_TRUE(it.AtEnd());
 
   // IMPALA-4049: list should transfer resources attached to empty batch.
-  RowBatch* batch2 = pool_.Add(new RowBatch(*desc_, 1, &tracker_));
+  RowBatch* batch2 = pool_.Add(new RowBatch(desc_, 1, &tracker_));
   DCHECK_EQ(0, batch2->tuple_data_pool()->total_allocated_bytes());
   row_list.TransferResourceOwnership(batch2);
   DCHECK_EQ(0, batch1->tuple_data_pool()->total_allocated_bytes());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/sort-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index 6d93130..552ee89 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -42,7 +42,7 @@ Status SortNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
       state, &ordering_exprs_));
   DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
   RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
-      child(0)->row_desc(), state, &sort_tuple_exprs_));
+      *child(0)->row_desc(), state, &sort_tuple_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/topn-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/topn-node.cc b/be/src/exec/topn-node.cc
index 73a8108..3b38e9d 100644
--- a/be/src/exec/topn-node.cc
+++ b/be/src/exec/topn-node.cc
@@ -58,7 +58,7 @@ Status TopNNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
       state, &ordering_exprs_));
   DCHECK(tsort_info.__isset.sort_tuple_slot_exprs);
   RETURN_IF_ERROR(ScalarExpr::Create(tsort_info.sort_tuple_slot_exprs,
-      child(0)->row_desc(), state, &output_tuple_exprs_));
+      *child(0)->row_desc(), state, &output_tuple_exprs_));
   is_asc_order_ = tnode.sort_node.sort_info.is_asc_order;
   nulls_first_ = tnode.sort_node.sort_info.nulls_first;
   DCHECK_EQ(conjuncts_.size(), 0)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/union-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/union-node.cc b/be/src/exec/union-node.cc
index d8707da..b3681be 100644
--- a/be/src/exec/union-node.cc
+++ b/be/src/exec/union-node.cc
@@ -53,7 +53,7 @@ Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
   const vector<vector<TExpr>>& const_texpr_lists = 
tnode.union_node.const_expr_lists;
   for (const vector<TExpr>& texprs : const_texpr_lists) {
     vector<ScalarExpr*> const_exprs;
-    RETURN_IF_ERROR(ScalarExpr::Create(texprs, row_desc(), state, 
&const_exprs));
+    RETURN_IF_ERROR(ScalarExpr::Create(texprs, *row_desc(), state, 
&const_exprs));
     DCHECK_EQ(const_exprs.size(), tuple_desc_->slots().size());
     const_exprs_lists_.push_back(const_exprs);
   }
@@ -63,7 +63,7 @@ Status UnionNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
     const vector<TExpr>& texprs = thrift_result_exprs[i];
     vector<ScalarExpr*> child_exprs;
     RETURN_IF_ERROR(
-        ScalarExpr::Create(texprs, child(i)->row_desc(), state, &child_exprs));
+        ScalarExpr::Create(texprs, *child(i)->row_desc(), state, 
&child_exprs));
     child_exprs_lists_.push_back(child_exprs);
     DCHECK_EQ(child_exprs.size(), tuple_desc_->slots().size());
   }
@@ -164,7 +164,7 @@ Status UnionNode::GetNextPassThrough(RuntimeState* state, 
RowBatch* row_batch) {
   DCHECK(!IsInSubplan());
   DCHECK_LT(child_idx_, children_.size());
   DCHECK(IsChildPassthrough(child_idx_));
-  DCHECK(child(child_idx_)->row_desc().LayoutEquals(row_batch->row_desc()));
+  DCHECK(child(child_idx_)->row_desc()->LayoutEquals(*row_batch->row_desc()));
   if (child_eos_) RETURN_IF_ERROR(child(child_idx_)->Open(state));
   DCHECK_EQ(row_batch->num_rows(), 0);
   RETURN_IF_ERROR(child(child_idx_)->GetNext(state, row_batch, &child_eos_));

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/exec/unnest-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/unnest-node.cc b/be/src/exec/unnest-node.cc
index 5bde655..2c02ff7 100644
--- a/be/src/exec/unnest-node.cc
+++ b/be/src/exec/unnest-node.cc
@@ -59,7 +59,7 @@ Status UnnestNode::Init(const TPlanNode& tnode, RuntimeState* 
state) {
 Status UnnestNode::InitCollExpr(RuntimeState* state) {
   DCHECK(containing_subplan_ != nullptr)
       << "set_containing_subplan() must have been called";
-  const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc();
+  const RowDescriptor& row_desc = *containing_subplan_->child(0)->row_desc();
   RETURN_IF_ERROR(ScalarExpr::Create(thrift_coll_expr_, row_desc, state, 
&coll_expr_));
   DCHECK(coll_expr_->IsSlotRef());
   return Status::OK();
@@ -78,8 +78,8 @@ Status UnnestNode::Prepare(RuntimeState* state) {
   num_collections_counter_ =
       ADD_COUNTER(runtime_profile_, "NumCollections", TUnit::UNIT);
 
-  DCHECK_EQ(1, row_desc().tuple_descriptors().size());
-  const TupleDescriptor* item_tuple_desc = row_desc().tuple_descriptors()[0];
+  DCHECK_EQ(1, row_desc()->tuple_descriptors().size());
+  const TupleDescriptor* item_tuple_desc = row_desc()->tuple_descriptors()[0];
   DCHECK(item_tuple_desc != nullptr);
   item_byte_size_ = item_tuple_desc->byte_size();
 
@@ -92,8 +92,8 @@ Status UnnestNode::Prepare(RuntimeState* state) {
   const SlotRef* slot_ref = static_cast<SlotRef*>(coll_expr_);
   coll_slot_desc_ = state->desc_tbl().GetSlotDescriptor(slot_ref->slot_id());
   DCHECK(coll_slot_desc_ != nullptr);
-  const RowDescriptor& row_desc = containing_subplan_->child(0)->row_desc();
-  coll_tuple_idx_ = row_desc.GetTupleIdx(coll_slot_desc_->parent()->id());
+  const RowDescriptor* row_desc = containing_subplan_->child(0)->row_desc();
+  coll_tuple_idx_ = row_desc->GetTupleIdx(coll_slot_desc_->parent()->id());
 
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc 
b/be/src/runtime/buffered-tuple-stream-test.cc
index 3c50ec5..0904833 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -136,16 +136,16 @@ class SimpleTupleStreamTest : public testing::Test {
   /// 'offset' is used to account for rows occupied by any previous row 
batches. This is
   /// needed to match the values generated in VerifyResults(). If 'gen_null' 
is true,
   /// some tuples will be set to NULL.
-  virtual RowBatch* CreateBatch(const RowDescriptor& row_desc, int offset,
-      int num_rows, bool gen_null) {
+  virtual RowBatch* CreateBatch(
+      const RowDescriptor* row_desc, int offset, int num_rows, bool gen_null) {
     RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_));
-    int num_tuples = row_desc.tuple_descriptors().size();
+    int num_tuples = row_desc->tuple_descriptors().size();
 
-    int idx = offset * CountSlotsPerRow(row_desc);
+    int idx = offset * CountSlotsPerRow(*row_desc);
     for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
       TupleRow* row = batch->GetRow(row_idx);
       for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-        TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
+        TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx];
         Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), 
batch->tuple_data_pool());
         bool is_null = gen_null && !GenBoolValue(idx);
         for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); 
++slot_idx, ++idx) {
@@ -175,11 +175,11 @@ class SimpleTupleStreamTest : public testing::Test {
   }
 
   virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) {
-    return CreateBatch(*int_desc_, offset, num_rows, gen_null);
+    return CreateBatch(int_desc_, offset, num_rows, gen_null);
   }
 
   virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) 
{
-    return CreateBatch(*string_desc_, offset, num_rows, gen_null);
+    return CreateBatch(string_desc_, offset, num_rows, gen_null);
   }
 
   void AppendValue(uint8_t* ptr, vector<int>* results) {
@@ -228,7 +228,7 @@ class SimpleTupleStreamTest : public testing::Test {
   void ReadValues(BufferedTupleStream* stream, RowDescriptor* desc, vector<T>* 
results,
       int num_batches = -1) {
     bool eos = false;
-    RowBatch batch(*desc, BATCH_SIZE, &tracker_);
+    RowBatch batch(desc, BATCH_SIZE, &tracker_);
     int batches_read = 0;
     do {
       batch.Reset();
@@ -286,8 +286,8 @@ class SimpleTupleStreamTest : public testing::Test {
   template <typename T>
   void TestValues(int num_batches, RowDescriptor* desc, bool gen_null,
       bool unpin_stream, int num_rows = BATCH_SIZE, bool use_small_buffers = 
true) {
-    BufferedTupleStream stream(runtime_state_, *desc, 
runtime_state_->block_mgr(),
-        client_, use_small_buffers, false);
+    BufferedTupleStream stream(runtime_state_, desc, 
runtime_state_->block_mgr(), client_,
+        use_small_buffers, false);
     ASSERT_OK(stream.Init(-1, NULL, true));
     bool got_write_buffer;
     ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
@@ -303,7 +303,7 @@ class SimpleTupleStreamTest : public testing::Test {
 
       Status status;
       ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == 
sizeof(StringValue));
-      batch = CreateBatch(*desc, offset, num_rows, gen_null);
+      batch = CreateBatch(desc, offset, num_rows, gen_null);
       for (int j = 0; j < batch->num_rows(); ++j) {
         bool b = stream.AddRow(batch->GetRow(j), &status);
         ASSERT_OK(status);
@@ -339,8 +339,8 @@ class SimpleTupleStreamTest : public testing::Test {
   void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
       bool unpin_stream) {
     for (int small_buffers = 0; small_buffers < 2; ++small_buffers) {
-      BufferedTupleStream stream(runtime_state_, *int_desc_, 
runtime_state_->block_mgr(),
-          client_, small_buffers == 0,  // initial small buffers
+      BufferedTupleStream stream(runtime_state_, int_desc_, 
runtime_state_->block_mgr(),
+          client_, small_buffers == 0, // initial small buffers
           true); // read_write
       ASSERT_OK(stream.Init(-1, NULL, true));
       bool got_write_buffer;
@@ -563,8 +563,8 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data) {
   InitBlockMgr(3 * buffer_size, buffer_size);
   RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_;
 
-  BufferedTupleStream stream(runtime_state_, *row_desc, 
runtime_state_->block_mgr(),
-      client_, true, false);
+  BufferedTupleStream stream(
+      runtime_state_, row_desc, runtime_state_->block_mgr(), client_, true, 
false);
   ASSERT_OK(stream.Init(-1, NULL, true));
   bool got_write_buffer;
   ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
@@ -635,8 +635,8 @@ TEST_F(SimpleTupleStreamTest, SmallBuffers) {
   int buffer_size = IO_BLOCK_SIZE;
   InitBlockMgr(2 * buffer_size, buffer_size);
 
-  BufferedTupleStream stream(runtime_state_, *int_desc_, 
runtime_state_->block_mgr(),
-      client_, true, false);
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, true, 
false);
   ASSERT_OK(stream.Init(-1, NULL, false));
   bool got_write_buffer;
   ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
@@ -685,8 +685,8 @@ void SimpleTupleStreamTest::TestTransferMemory(bool 
pin_stream, bool read_write)
   int buffer_size = 4 * 1024;
   InitBlockMgr(100 * buffer_size, buffer_size);
 
-  BufferedTupleStream stream(runtime_state_, *int_desc_, 
runtime_state_->block_mgr(),
-      client_, false, read_write);
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, 
read_write);
   ASSERT_OK(stream.Init(-1, NULL, pin_stream));
   bool got_write_buffer;
   ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
@@ -759,7 +759,7 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
     external_slots.insert(tuple_desc.string_slots()[i]->id());
   }
 
-  BufferedTupleStream stream(runtime_state_, *string_desc_, 
runtime_state_->block_mgr(),
+  BufferedTupleStream stream(runtime_state_, string_desc_, 
runtime_state_->block_mgr(),
       client_, true, false, external_slots);
   for (int i = 0; i < num_batches; ++i) {
     RowBatch* batch = CreateStringBatch(rows_added, BATCH_SIZE, false);
@@ -824,7 +824,7 @@ TEST_F(SimpleTupleStreamTest, BigRow) {
   RowDescriptor* nullable_row_desc = pool_.Add(new RowDescriptor(
       *desc, tuple_ids, nullable_tuples));
   ASSERT_TRUE(nullable_row_desc->IsAnyTupleNullable());
-  BufferedTupleStream nullable_stream(runtime_state_, *nullable_row_desc,
+  BufferedTupleStream nullable_stream(runtime_state_, nullable_row_desc,
       runtime_state_->block_mgr(), client_, false, false);
   Status status = nullable_stream.Init(-1, NULL, true);
   ASSERT_FALSE(status.ok());
@@ -834,8 +834,8 @@ TEST_F(SimpleTupleStreamTest, BigRow) {
 // Test for IMPALA-3923: overflow of 32-bit int in GetRows().
 TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
   InitBlockMgr(-1, 8 * 1024 * 1024);
-  BufferedTupleStream stream(runtime_state_, *int_desc_, 
runtime_state_->block_mgr(),
-      client_, false, false);
+  BufferedTupleStream stream(
+      runtime_state_, int_desc_, runtime_state_->block_mgr(), client_, false, 
false);
   ASSERT_OK(stream.Init(-1, NULL, true));
 
   Status status;
@@ -926,8 +926,8 @@ TEST_F(MultiTupleStreamTest, MultiTupleAllocateRow) {
 
   int num_batches = 1;
   int rows_added = 0;
-  BufferedTupleStream stream(runtime_state_, *string_desc_, 
runtime_state_->block_mgr(),
-      client_, false, false);
+  BufferedTupleStream stream(
+      runtime_state_, string_desc_, runtime_state_->block_mgr(), client_, 
false, false);
   ASSERT_OK(stream.Init(-1, NULL, false));
   bool got_write_buffer;
   ASSERT_OK(stream.PrepareForWrite(&got_write_buffer));
@@ -1032,7 +1032,7 @@ TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
   const SlotDescriptor* external_string_slot = tuple_descs[1]->slots()[0];
   external_slots.insert(external_string_slot->id());
 
-  BufferedTupleStream stream(runtime_state_, *string_desc_, 
runtime_state_->block_mgr(),
+  BufferedTupleStream stream(runtime_state_, string_desc_, 
runtime_state_->block_mgr(),
       client_, false, false, external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
         malloc(tuple_descs.size() * sizeof(Tuple*))));
@@ -1079,8 +1079,8 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   Status status;
   InitBlockMgr(-1, IO_BLOCK_SIZE);
   const int NUM_ROWS = 4000;
-  BufferedTupleStream stream(runtime_state_, *array_desc_, 
runtime_state_->block_mgr(),
-      client_, false, false);
+  BufferedTupleStream stream(
+      runtime_state_, array_desc_, runtime_state_->block_mgr(), client_, 
false, false);
   const vector<TupleDescriptor*>& tuple_descs = 
array_desc_->tuple_descriptors();
   // Write out a predictable pattern of data by iterating over arrays of 
constants.
   int strings_index = 0; // we take the mod of this as index into STRINGS.
@@ -1148,7 +1148,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   array_len_index = 0;
   bool eos = false;
   int rows_read = 0;
-  RowBatch batch(*array_desc_, BATCH_SIZE, &tracker_);
+  RowBatch batch(array_desc_, BATCH_SIZE, &tracker_);
   do {
     batch.Reset();
     ASSERT_OK(stream.GetNext(&batch, &eos));
@@ -1191,7 +1191,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   const SlotDescriptor* external_array_slot = tuple_descs[0]->slots()[1];
   external_slots.insert(external_array_slot->id());
 
-  BufferedTupleStream stream(runtime_state_, *array_desc_, 
runtime_state_->block_mgr(),
+  BufferedTupleStream stream(runtime_state_, array_desc_, 
runtime_state_->block_mgr(),
       client_, false, false, external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
         malloc(tuple_descs.size() * sizeof(Tuple*))));

Reply via email to