Repository: incubator-impala
Updated Branches:
  refs/heads/master 352ad55db -> 317c413a0


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2-test.cc 
b/be/src/runtime/buffered-tuple-stream-v2-test.cc
index 37ddad7..277a564 100644
--- a/be/src/runtime/buffered-tuple-stream-v2-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2-test.cc
@@ -167,15 +167,15 @@ class SimpleTupleStreamTest : public testing::Test {
   /// needed to match the values generated in VerifyResults(). If 'gen_null' 
is true,
   /// some tuples will be set to NULL.
   virtual RowBatch* CreateBatch(
-      const RowDescriptor& row_desc, int offset, int num_rows, bool gen_null) {
+      const RowDescriptor* row_desc, int offset, int num_rows, bool gen_null) {
     RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, &tracker_));
-    int num_tuples = row_desc.tuple_descriptors().size();
+    int num_tuples = row_desc->tuple_descriptors().size();
 
-    int idx = offset * CountSlotsPerRow(row_desc);
+    int idx = offset * CountSlotsPerRow(*row_desc);
     for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
       TupleRow* row = batch->GetRow(row_idx);
       for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
-        TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
+        TupleDescriptor* tuple_desc = row_desc->tuple_descriptors()[tuple_idx];
         Tuple* tuple = Tuple::Create(tuple_desc->byte_size(), 
batch->tuple_data_pool());
         bool is_null = gen_null && !GenBoolValue(idx);
         for (int slot_idx = 0; slot_idx < tuple_desc->slots().size(); 
++slot_idx, ++idx) {
@@ -205,11 +205,11 @@ class SimpleTupleStreamTest : public testing::Test {
   }
 
   virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) {
-    return CreateBatch(*int_desc_, offset, num_rows, gen_null);
+    return CreateBatch(int_desc_, offset, num_rows, gen_null);
   }
 
   virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) 
{
-    return CreateBatch(*string_desc_, offset, num_rows, gen_null);
+    return CreateBatch(string_desc_, offset, num_rows, gen_null);
   }
 
   void AppendValue(uint8_t* ptr, vector<int>* results) {
@@ -258,7 +258,7 @@ class SimpleTupleStreamTest : public testing::Test {
   void ReadValues(BufferedTupleStreamV2* stream, RowDescriptor* desc, 
vector<T>* results,
       int num_batches = -1) {
     bool eos = false;
-    RowBatch batch(*desc, BATCH_SIZE, &tracker_);
+    RowBatch batch(desc, BATCH_SIZE, &tracker_);
     int batches_read = 0;
     do {
       batch.Reset();
@@ -319,7 +319,7 @@ class SimpleTupleStreamTest : public testing::Test {
     if (max_page_len == -1) max_page_len = default_page_len;
 
     BufferedTupleStreamV2 stream(
-        runtime_state_, *desc, &client_, default_page_len, max_page_len);
+        runtime_state_, desc, &client_, default_page_len, max_page_len);
     ASSERT_OK(stream.Init(-1, true));
     bool got_write_reservation;
     ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
@@ -335,7 +335,7 @@ class SimpleTupleStreamTest : public testing::Test {
 
       Status status;
       ASSERT_TRUE(sizeof(T) == sizeof(int) || sizeof(T) == 
sizeof(StringValue));
-      batch = CreateBatch(*desc, offset, num_rows, gen_null);
+      batch = CreateBatch(desc, offset, num_rows, gen_null);
       for (int j = 0; j < batch->num_rows(); ++j) {
         // TODO: test that AddRow succeeds after freeing memory.
         bool b = stream.AddRow(batch->GetRow(j), &status);
@@ -363,8 +363,7 @@ class SimpleTupleStreamTest : public testing::Test {
 
   void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
       bool unpin_stream, int64_t page_len = PAGE_LEN) {
-    BufferedTupleStreamV2 stream(
-        runtime_state_, *int_desc_, &client_, page_len, page_len);
+    BufferedTupleStreamV2 stream(runtime_state_, int_desc_, &client_, 
page_len, page_len);
     ASSERT_OK(stream.Init(-1, true));
     bool got_reservation;
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
@@ -606,7 +605,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, 
bool read_write) {
   RowDescriptor* row_desc = varlen_data ? string_desc_ : int_desc_;
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *row_desc, &client_, buffer_size, buffer_size);
+      runtime_state_, row_desc, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, true));
   if (read_write) {
     bool got_reservation = false;
@@ -701,7 +700,7 @@ void SimpleTupleStreamTest::TestTransferMemory(bool 
pin_stream, bool read_write)
   Init(100 * buffer_size);
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *int_desc_, &client_, buffer_size, buffer_size);
+      runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, pin_stream));
   if (read_write) {
     bool got_reservation;
@@ -784,7 +783,7 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
   }
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, buffer_size, buffer_size, 
external_slots);
+      runtime_state_, string_desc_, &client_, buffer_size, buffer_size, 
external_slots);
   ASSERT_OK(stream.Init(0, false));
   bool got_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_reservation));
@@ -859,18 +858,18 @@ TEST_F(SimpleTupleStreamTest, BigRow) {
   // indicators in the stream so adding the row will fail.
   ASSERT_TRUE(nullable_big_row_desc_->IsAnyTupleNullable());
   BufferedTupleStreamV2 nullable_stream(
-      runtime_state_, *nullable_big_row_desc_, &client_, BIG_ROW_BYTES, 
BIG_ROW_BYTES);
+      runtime_state_, nullable_big_row_desc_, &client_, BIG_ROW_BYTES, 
BIG_ROW_BYTES);
   ASSERT_OK(nullable_stream.Init(-1, true));
   bool got_reservation;
   ASSERT_OK(nullable_stream.PrepareForWrite(&got_reservation));
 
   // With null tuples, a row can fit in the stream.
-  RowBatch* batch = CreateBatch(*nullable_big_row_desc_, 0, 1, true);
+  RowBatch* batch = CreateBatch(nullable_big_row_desc_, 0, 1, true);
   Status status;
   EXPECT_TRUE(nullable_stream.AddRow(batch->GetRow(0), &status));
   // With the additional null indicator, we can't fit all the tuples of a row 
into
   // the stream.
-  batch = CreateBatch(*nullable_big_row_desc_, 0, 1, false);
+  batch = CreateBatch(nullable_big_row_desc_, 0, 1, false);
   EXPECT_FALSE(nullable_stream.AddRow(batch->GetRow(0), &status));
   EXPECT_EQ(TErrorCode::MAX_ROW_SIZE, status.code());
   nullable_stream.Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
@@ -883,7 +882,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   Init(MAX_BUFFERS * BIG_ROW_BYTES);
   Status status;
   BufferedTupleStreamV2 stream(
-      runtime_state_, *big_row_desc_, &client_, DEFAULT_PAGE_LEN, 
BIG_ROW_BYTES * 2);
+      runtime_state_, big_row_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES 
* 2);
   ASSERT_OK(stream.Init(-1, true));
   RowBatch* batch;
   bool got_reservation;
@@ -891,7 +890,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   ASSERT_TRUE(got_reservation);
   // We should be able to append MAX_BUFFERS without problem.
   for (int i = 0; i < MAX_BUFFERS; ++i) {
-    batch = CreateBatch(*big_row_desc_, i, 1, false);
+    batch = CreateBatch(big_row_desc_, i, 1, false);
     bool success = stream.AddRow(batch->GetRow(0), &status);
     ASSERT_TRUE(success);
     // We should have one large page per row.
@@ -900,7 +899,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   }
 
   // We can't fit another row in memory - need to unpin to make progress.
-  batch = CreateBatch(*big_row_desc_, MAX_BUFFERS, 1, false);
+  batch = CreateBatch(big_row_desc_, MAX_BUFFERS, 1, false);
   bool success = stream.AddRow(batch->GetRow(0), &status);
   ASSERT_FALSE(success);
   ASSERT_OK(status);
@@ -919,7 +918,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
 // Test for IMPALA-3923: overflow of 32-bit int in GetRows().
 TEST_F(SimpleTupleStreamTest, TestGetRowsOverflow) {
   Init(BUFFER_POOL_LIMIT);
-  BufferedTupleStreamV2 stream(runtime_state_, *int_desc_, &client_, PAGE_LEN, 
PAGE_LEN);
+  BufferedTupleStreamV2 stream(runtime_state_, int_desc_, &client_, PAGE_LEN, 
PAGE_LEN);
   ASSERT_OK(stream.Init(-1, true));
 
   Status status;
@@ -941,10 +940,10 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   Init(MAX_BUFFERS * BIG_ROW_BYTES);
   Status status;
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES 
* 2);
+      runtime_state_, string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES 
* 2);
   ASSERT_OK(stream.Init(-1, true));
-  RowBatch write_batch(*string_desc_, 1024, &tracker_);
-  RowBatch read_batch(*string_desc_, 1024, &tracker_);
+  RowBatch write_batch(string_desc_, 1024, &tracker_);
+  RowBatch read_batch(string_desc_, 1024, &tracker_);
   bool got_reservation;
   ASSERT_OK(stream.PrepareForReadWrite(false, &got_reservation));
   ASSERT_TRUE(got_reservation);
@@ -1109,7 +1108,7 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   int num_batches = 1;
   int rows_added = 0;
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, buffer_size, buffer_size);
+      runtime_state_, string_desc_, &client_, buffer_size, buffer_size);
   ASSERT_OK(stream.Init(-1, false));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
@@ -1229,7 +1228,7 @@ TEST_F(MultiNullableTupleStreamTest, TestComputeRowSize) {
   external_slots.insert(external_string_slot->id());
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *string_desc_, &client_, PAGE_LEN, PAGE_LEN, 
external_slots);
+      runtime_state_, string_desc_, &client_, PAGE_LEN, PAGE_LEN, 
external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(
       reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * 
sizeof(Tuple*))));
   gscoped_ptr<Tuple, FreeDeleter> tuple0(
@@ -1279,8 +1278,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   Status status;
   Init(BUFFER_POOL_LIMIT);
   const int NUM_ROWS = 4000;
-  BufferedTupleStreamV2 stream(
-      runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN);
+  BufferedTupleStreamV2 stream(runtime_state_, array_desc_, &client_, 
PAGE_LEN, PAGE_LEN);
   const vector<TupleDescriptor*>& tuple_descs = 
array_desc_->tuple_descriptors();
   // Write out a predictable pattern of data by iterating over arrays of 
constants.
   int strings_index = 0; // we take the mod of this as index into STRINGS.
@@ -1350,7 +1348,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   array_len_index = 0;
   bool eos = false;
   int rows_read = 0;
-  RowBatch batch(*array_desc_, BATCH_SIZE, &tracker_);
+  RowBatch batch(array_desc_, BATCH_SIZE, &tracker_);
   do {
     batch.Reset();
     ASSERT_OK(stream.GetNext(&batch, &eos));
@@ -1394,7 +1392,7 @@ TEST_F(ArrayTupleStreamTest, TestComputeRowSize) {
   external_slots.insert(external_array_slot->id());
 
   BufferedTupleStreamV2 stream(
-      runtime_state_, *array_desc_, &client_, PAGE_LEN, PAGE_LEN, 
external_slots);
+      runtime_state_, array_desc_, &client_, PAGE_LEN, PAGE_LEN, 
external_slots);
   gscoped_ptr<TupleRow, FreeDeleter> row(
       reinterpret_cast<TupleRow*>(malloc(tuple_descs.size() * 
sizeof(Tuple*))));
   gscoped_ptr<Tuple, FreeDeleter> tuple0(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc 
b/be/src/runtime/buffered-tuple-stream-v2.cc
index 424678b..82da2bc 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -47,7 +47,7 @@ using namespace strings;
 using BufferHandle = BufferPool::BufferHandle;
 
 BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
-    const RowDescriptor& row_desc, BufferPool::ClientHandle* 
buffer_pool_client,
+    const RowDescriptor* row_desc, BufferPool::ClientHandle* 
buffer_pool_client,
     int64_t default_page_len, int64_t max_page_len, const set<SlotId>& 
ext_varlen_slots)
   : state_(state),
     desc_(row_desc),
@@ -70,7 +70,7 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* 
state,
     num_rows_(0),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc.IsAnyTupleNullable()),
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
     delete_on_read_(false),
     closed_(false),
     pinned_(true) {
@@ -78,8 +78,8 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* 
state,
   DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
   DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
   read_page_ = pages_.end();
-  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
-    const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
+  for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
+    const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
     const int tuple_byte_size = tuple_desc->byte_size();
     fixed_tuple_sizes_.push_back(tuple_byte_size);
 
@@ -704,7 +704,7 @@ template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
 Status BufferedTupleStreamV2::GetNextInternal(
     RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
   DCHECK(!closed_);
-  DCHECK(batch->row_desc().Equals(desc_));
+  DCHECK(batch->row_desc()->Equals(*desc_));
   DCHECK(is_pinned() || !FILL_FLAT_ROWS)
       << "FlatRowPtrs are only valid for pinned streams";
   *eos = (rows_returned_ == num_rows_);
@@ -739,7 +739,7 @@ Status BufferedTupleStreamV2::GetNextInternal(
     flat_rows->reserve(rows_to_fill);
   }
 
-  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
   // Start reading from the current position in 'read_page_'.
   for (int i = 0; i < rows_to_fill; ++i) {
     if (FILL_FLAT_ROWS) {
@@ -921,7 +921,7 @@ template <bool HAS_NULLABLE_TUPLE>
 bool BufferedTupleStreamV2::DeepCopyInternal(
     TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept {
   uint8_t* pos = *data;
-  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
   // Copy the not NULL fixed len tuples. For the NULL tuples just update the 
NULL tuple
   // indicator.
   if (HAS_NULLABLE_TUPLE) {
@@ -1038,7 +1038,7 @@ void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr 
flat_row, TupleRow* row) cons
 
 template <bool HAS_NULLABLE_TUPLE>
 void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) 
const {
-  const int tuples_per_row = desc_.tuple_descriptors().size();
+  const int tuples_per_row = desc_->tuple_descriptors().size();
   uint8_t* ptr = *data;
   if (has_nullable_tuple_) {
     // Stitch together the tuples from the page and the NULL ones.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h 
b/be/src/runtime/buffered-tuple-stream-v2.h
index 4376c11..c06dc6c 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -208,7 +208,7 @@ class BufferedTupleStreamV2 {
   /// that are added and the rows being returned.
   /// page_len: the size of pages to use in the stream
   /// ext_varlen_slots: set of varlen slots with data stored externally to the 
stream
-  BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc,
+  BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor* row_desc,
       BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len,
       int64_t max_page_len,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
@@ -406,7 +406,7 @@ class BufferedTupleStreamV2 {
   RuntimeState* const state_;
 
   /// Description of rows stored in the stream.
-  const RowDescriptor& desc_;
+  const RowDescriptor* desc_;
 
   /// Plan node ID, used for error reporting.
   int node_id_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.cc 
b/be/src/runtime/buffered-tuple-stream.cc
index bfe933c..cce6390 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -47,7 +47,7 @@ string BufferedTupleStream::RowIdx::DebugString() const {
 }
 
 BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
-    const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr,
+    const RowDescriptor* row_desc, BufferedBlockMgr* block_mgr,
     BufferedBlockMgr::Client* client, bool use_initial_small_buffers, bool 
read_write,
     const set<SlotId>& ext_varlen_slots)
   : state_(state),
@@ -71,7 +71,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     unpin_timer_(NULL),
     get_new_block_timer_(NULL),
     read_write_(read_write),
-    has_nullable_tuple_(row_desc.IsAnyTupleNullable()),
+    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
     use_small_buffers_(use_initial_small_buffers),
     delete_on_read_(false),
     closed_(false),
@@ -81,8 +81,8 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
   max_null_indicators_size_ = -1;
   read_block_ = blocks_.end();
   fixed_tuple_row_size_ = 0;
-  for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
-    const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i];
+  for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
+    const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
     const int tuple_byte_size = tuple_desc->byte_size();
     fixed_tuple_sizes_.push_back(tuple_byte_size);
     fixed_tuple_row_size_ += tuple_byte_size;
@@ -157,7 +157,8 @@ Status BufferedTupleStream::Init(int node_id, 
RuntimeProfile* profile, bool pinn
   max_null_indicators_size_ = 
ComputeNumNullIndicatorBytes(block_mgr_->max_block_size());
   if (UNLIKELY(max_null_indicators_size_ < 0)) {
     // The block cannot even fit in a row of tuples so just assume there is 
one row.
-    int null_indicators_size = 
BitUtil::RoundUpNumi64(desc_.tuple_descriptors().size()) * 8;
+    int null_indicators_size =
+        BitUtil::RoundUpNumi64(desc_->tuple_descriptors().size()) * 8;
     return Status(TErrorCode::BTS_BLOCK_OVERFLOW,
         PrettyPrinter::Print(fixed_tuple_row_size_, TUnit::BYTES),
         PrettyPrinter::Print(null_indicators_size,  TUnit::BYTES));
@@ -489,7 +490,7 @@ int BufferedTupleStream::ComputeNumNullIndicatorBytes(int 
block_size) const {
   if (has_nullable_tuple_) {
     // We assume that all rows will use their max size, so we may be 
underutilizing the
     // space, i.e. we may have some unused space in case of rows with NULL 
tuples.
-    const uint32_t tuples_per_row = desc_.tuple_descriptors().size();
+    const uint32_t tuples_per_row = desc_->tuple_descriptors().size();
     const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + 
tuples_per_row;
     const uint32_t block_size_in_bits = 8 * block_size;
     const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits;
@@ -547,12 +548,12 @@ template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE>
 Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos,
     vector<RowIdx>* indices) {
   DCHECK(!closed_);
-  DCHECK(batch->row_desc().LayoutEquals(desc_));
+  DCHECK(batch->row_desc()->LayoutEquals(*desc_));
   *eos = (rows_returned_ == num_rows_);
   if (*eos) return Status::OK();
   DCHECK_GE(read_block_null_indicators_size_, 0);
 
-  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
   DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows());
   DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0);
   int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row;
@@ -762,7 +763,7 @@ bool BufferedTupleStream::DeepCopyInternal(TupleRow* row) 
noexcept {
   DCHECK(write_block_->is_pinned()) << DebugString() << std::endl
       << write_block_->DebugString();
 
-  const uint64_t tuples_per_row = desc_.tuple_descriptors().size();
+  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
   uint32_t bytes_remaining = write_block_bytes_remaining();
   if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_) ||
               (HasNullableTuple &&
@@ -882,7 +883,7 @@ void BufferedTupleStream::GetTupleRow(const RowIdx& idx, 
TupleRow* row) const {
   uint8_t* data = block_start_idx_[idx.block()] + idx.offset();
   if (has_nullable_tuple_) {
     // Stitch together the tuples from the block and the NULL ones.
-    const int tuples_per_row = desc_.tuple_descriptors().size();
+    const int tuples_per_row = desc_->tuple_descriptors().size();
     uint32_t tuple_idx = idx.idx() * tuples_per_row;
     for (int i = 0; i < tuples_per_row; ++i) {
       const uint8_t* null_word = block_start_idx_[idx.block()] + (tuple_idx >> 
3);
@@ -890,13 +891,13 @@ void BufferedTupleStream::GetTupleRow(const RowIdx& idx, 
TupleRow* row) const {
       const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
       row->SetTuple(i, reinterpret_cast<Tuple*>(
           reinterpret_cast<uint64_t>(data) * is_not_null));
-      data += desc_.tuple_descriptors()[i]->byte_size() * is_not_null;
+      data += desc_->tuple_descriptors()[i]->byte_size() * is_not_null;
       ++tuple_idx;
     }
   } else {
-    for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) {
+    for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
       row->SetTuple(i, reinterpret_cast<Tuple*>(data));
-      data += desc_.tuple_descriptors()[i]->byte_size();
+      data += desc_->tuple_descriptors()[i]->byte_size();
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/buffered-tuple-stream.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream.h 
b/be/src/runtime/buffered-tuple-stream.h
index ea5e8b8..41d63bf 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -214,7 +214,7 @@ class BufferedTupleStream {
   /// read_write: Stream allows interchanging read and write operations. 
Requires at
   /// least two blocks may be pinned.
   /// ext_varlen_slots: set of varlen slots with data stored externally to the 
stream
-  BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc,
+  BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc,
       BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client,
       bool use_initial_small_buffers, bool read_write,
       const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
@@ -350,7 +350,7 @@ class BufferedTupleStream {
   RuntimeState* const state_;
 
   /// Description of rows stored in the stream.
-  const RowDescriptor& desc_;
+  const RowDescriptor* desc_;
 
   /// Sum of the fixed length portion of all the tuples in desc_.
   int fixed_tuple_row_size_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.cc 
b/be/src/runtime/data-stream-mgr.cc
index ef9669e..f36f4c9 100644
--- a/be/src/runtime/data-stream-mgr.cc
+++ b/be/src/runtime/data-stream-mgr.cc
@@ -73,7 +73,7 @@ inline uint32_t DataStreamMgr::GetHashValue(
 }
 
 shared_ptr<DataStreamRecvr> DataStreamMgr::CreateRecvr(RuntimeState* state,
-    const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
+    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int num_senders, int buffer_size, RuntimeProfile* 
profile,
     bool is_merging) {
   DCHECK(profile != NULL);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-mgr.h b/be/src/runtime/data-stream-mgr.h
index 8aa3b3a..7819c24 100644
--- a/be/src/runtime/data-stream-mgr.h
+++ b/be/src/runtime/data-stream-mgr.h
@@ -73,10 +73,9 @@ class DataStreamMgr {
   /// single stream.
   /// Ownership of the receiver is shared between this DataStream mgr instance 
and the
   /// caller.
-  std::shared_ptr<DataStreamRecvr> CreateRecvr(
-      RuntimeState* state, const RowDescriptor& row_desc,
-      const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id,
-      int num_senders, int buffer_size, RuntimeProfile* profile,
+  std::shared_ptr<DataStreamRecvr> CreateRecvr(RuntimeState* state,
+      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
+      PlanNodeId dest_node_id, int num_senders, int buffer_size, 
RuntimeProfile* profile,
       bool is_merging);
 
   /// Adds a row batch to the recvr identified by 
fragment_instance_id/dest_node_id

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-recvr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.cc 
b/be/src/runtime/data-stream-recvr.cc
index 50103ba..2acab8b 100644
--- a/be/src/runtime/data-stream-recvr.cc
+++ b/be/src/runtime/data-stream-recvr.cc
@@ -208,7 +208,7 @@ void DataStreamRecvr::SenderQueue::AddBatch(const 
TRowBatch& thrift_batch) {
       // Note: if this function makes a row batch, the batch *must* be added
       // to batch_queue_. It is not valid to create the row batch and destroy
       // it in this thread.
-      batch = new RowBatch(recvr_->row_desc(), thrift_batch, 
recvr_->mem_tracker());
+      batch = new RowBatch(recvr_->row_desc_, thrift_batch, 
recvr_->mem_tracker());
     }
     VLOG_ROW << "added #rows=" << batch->num_rows()
              << " batch_size=" << batch_size << "\n";
@@ -265,7 +265,7 @@ Status DataStreamRecvr::CreateMerger(const 
TupleRowComparator& less_than) {
   input_batch_suppliers.reserve(sender_queues_.size());
 
   // Create the merger that will a single stream of sorted rows.
-  merger_.reset(new SortedRunMerger(less_than, &row_desc_, profile_, false));
+  merger_.reset(new SortedRunMerger(less_than, row_desc_, profile_, false));
 
   for (int i = 0; i < sender_queues_.size(); ++i) {
     input_batch_suppliers.push_back(
@@ -284,7 +284,7 @@ void DataStreamRecvr::TransferAllResources(RowBatch* 
transfer_batch) {
 }
 
 DataStreamRecvr::DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* 
parent_tracker,
-    const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
+    const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
     PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
     RuntimeProfile* profile)
   : mgr_(stream_mgr),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-recvr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-recvr.h 
b/be/src/runtime/data-stream-recvr.h
index 07e9224..96adfc4 100644
--- a/be/src/runtime/data-stream-recvr.h
+++ b/be/src/runtime/data-stream-recvr.h
@@ -90,7 +90,7 @@ class DataStreamRecvr {
 
   const TUniqueId& fragment_instance_id() const { return 
fragment_instance_id_; }
   PlanNodeId dest_node_id() const { return dest_node_id_; }
-  const RowDescriptor& row_desc() const { return row_desc_; }
+  const RowDescriptor& row_desc() const { return *row_desc_; }
   MemTracker* mem_tracker() const { return mem_tracker_.get(); }
 
  private:
@@ -98,7 +98,7 @@ class DataStreamRecvr {
   class SenderQueue;
 
   DataStreamRecvr(DataStreamMgr* stream_mgr, MemTracker* parent_tracker,
-      const RowDescriptor& row_desc, const TUniqueId& fragment_instance_id,
+      const RowDescriptor* row_desc, const TUniqueId& fragment_instance_id,
       PlanNodeId dest_node_id, int num_senders, bool is_merging, int 
total_buffer_limit,
       RuntimeProfile* profile);
 
@@ -131,8 +131,8 @@ class DataStreamRecvr {
   /// exceeds this value
   int total_buffer_limit_;
 
-  /// Row schema, copied from the caller of CreateRecvr().
-  RowDescriptor row_desc_;
+  /// Row schema.
+  const RowDescriptor* row_desc_;
 
   /// True if this reciver merges incoming rows from different senders. 
Per-sender
   /// row batch queues are maintained in this case.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-sender.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.cc 
b/be/src/runtime/data-stream-sender.cc
index a45c2c4..aeadb56 100644
--- a/be/src/runtime/data-stream-sender.cc
+++ b/be/src/runtime/data-stream-sender.cc
@@ -66,9 +66,9 @@ class DataStreamSender::Channel : public CacheLineAligned {
   // combination. buffer_size is specified in bytes and a soft limit on
   // how much tuple data is getting accumulated before being sent; it only 
applies
   // when data is added via AddRow() and not sent directly via SendBatch().
-  Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
-          const TNetworkAddress& destination, const TUniqueId& 
fragment_instance_id,
-          PlanNodeId dest_node_id, int buffer_size)
+  Channel(DataStreamSender* parent, const RowDescriptor* row_desc,
+      const TNetworkAddress& destination, const TUniqueId& 
fragment_instance_id,
+      PlanNodeId dest_node_id, int buffer_size)
     : parent_(parent),
       buffer_size_(buffer_size),
       row_desc_(row_desc),
@@ -78,8 +78,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
       num_data_bytes_sent_(0),
       rpc_thread_("DataStreamSender", "SenderThread", 1, 1,
           bind<void>(mem_fn(&Channel::TransmitData), this, _1, _2)),
-      rpc_in_flight_(false) {
-  }
+      rpc_in_flight_(false) {}
 
   // Initialize channel.
   // Returns OK if successful, error indication otherwise.
@@ -115,7 +114,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
   DataStreamSender* parent_;
   int buffer_size_;
 
-  const RowDescriptor& row_desc_;
+  const RowDescriptor* row_desc_;
   TNetworkAddress address_;
   TUniqueId fragment_instance_id_;
   PlanNodeId dest_node_id_;
@@ -159,7 +158,7 @@ class DataStreamSender::Channel : public CacheLineAligned {
 Status DataStreamSender::Channel::Init(RuntimeState* state) {
   runtime_state_ = state;
   // TODO: figure out how to size batch_
-  int capacity = max(1, buffer_size_ / max(row_desc_.GetRowSize(), 1));
+  int capacity = max(1, buffer_size_ / max(row_desc_->GetRowSize(), 1));
   batch_.reset(new RowBatch(row_desc_, capacity, parent_->mem_tracker()));
   return Status::OK();
 }
@@ -250,7 +249,7 @@ Status DataStreamSender::Channel::AddRow(TupleRow* row) {
     RETURN_IF_ERROR(SendCurrentBatch());
   }
   TupleRow* dest = batch_->GetRow(batch_->AddRow());
-  const vector<TupleDescriptor*>& descs = row_desc_.tuple_descriptors();
+  const vector<TupleDescriptor*>& descs = row_desc_->tuple_descriptors();
   for (int i = 0; i < descs.size(); ++i) {
     if (UNLIKELY(row->GetTuple(i) == NULL)) {
       dest->SetTuple(i, NULL);
@@ -325,9 +324,8 @@ void DataStreamSender::Channel::Teardown(RuntimeState* 
state) {
   batch_.reset();
 }
 
-DataStreamSender::DataStreamSender(int sender_id,
-    const RowDescriptor& row_desc, const TDataStreamSink& sink,
-    const vector<TPlanFragmentDestination>& destinations,
+DataStreamSender::DataStreamSender(int sender_id, const RowDescriptor* 
row_desc,
+    const TDataStreamSink& sink, const vector<TPlanFragmentDestination>& 
destinations,
     int per_channel_buffer_size)
   : DataSink(row_desc),
     sender_id_(sender_id),
@@ -381,7 +379,7 @@ Status DataStreamSender::Init(const vector<TExpr>& 
thrift_output_exprs,
   if (partition_type_ == TPartitionType::HASH_PARTITIONED ||
       partition_type_ == TPartitionType::KUDU) {
     
RETURN_IF_ERROR(ScalarExpr::Create(tsink.stream_sink.output_partition.partition_exprs,
-        row_desc_, state, &partition_exprs_));
+        *row_desc_, state, &partition_exprs_));
   }
   return Status::OK();
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-sender.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-sender.h 
b/be/src/runtime/data-stream-sender.h
index 0719daf..95a83e5 100644
--- a/be/src/runtime/data-stream-sender.h
+++ b/be/src/runtime/data-stream-sender.h
@@ -57,7 +57,7 @@ class DataStreamSender : public DataSink {
   /// The RowDescriptor must live until Close() is called.
   /// NOTE: supported partition types are UNPARTITIONED (broadcast), 
HASH_PARTITIONED,
   /// and RANDOM.
-  DataStreamSender(int sender_id, const RowDescriptor& row_desc,
+  DataStreamSender(int sender_id, const RowDescriptor* row_desc,
       const TDataStreamSink& tsink,
       const std::vector<TPlanFragmentDestination>& destinations,
       int per_channel_buffer_size);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/data-stream-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/data-stream-test.cc 
b/be/src/runtime/data-stream-test.cc
index 1048fb6..a358286 100644
--- a/be/src/runtime/data-stream-test.cc
+++ b/be/src/runtime/data-stream-test.cc
@@ -310,7 +310,7 @@ class DataStreamTest : public testing::Test {
 
   // Create batch_, but don't fill it with data yet. Assumes we created 
row_desc_.
   RowBatch* CreateRowBatch() {
-    RowBatch* batch = new RowBatch(*row_desc_, BATCH_CAPACITY, &tracker_);
+    RowBatch* batch = new RowBatch(row_desc_, BATCH_CAPACITY, &tracker_);
     int64_t* tuple_mem = reinterpret_cast<int64_t*>(
         batch->tuple_data_pool()->Allocate(BATCH_CAPACITY * 8));
     bzero(tuple_mem, BATCH_CAPACITY * 8);
@@ -341,7 +341,7 @@ class DataStreamTest : public testing::Test {
     GetNextInstanceId(&instance_id);
     receiver_info_.push_back(ReceiverInfo(stream_type, num_senders, 
receiver_num));
     ReceiverInfo& info = receiver_info_.back();
-    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), 
*row_desc_,
+    info.stream_recvr = stream_mgr_->CreateRecvr(runtime_state_.get(), 
row_desc_,
         instance_id, DEST_NODE_ID, num_senders, buffer_size, profile, 
is_merging);
     if (!is_merging) {
       info.thread_handle = new thread(&DataStreamTest::ReadStream, this, 
&info);
@@ -380,7 +380,7 @@ class DataStreamTest : public testing::Test {
   void ReadStreamMerging(ReceiverInfo* info, RuntimeProfile* profile) {
     info->status = info->stream_recvr->CreateMerger(*less_than_);
     if (info->status.IsCancelled()) return;
-    RowBatch batch(*row_desc_, 1024, &tracker_);
+    RowBatch batch(row_desc_, 1024, &tracker_);
     VLOG_QUERY << "start reading merging";
     bool eos;
     while (!(info->status = info->stream_recvr->GetNext(&batch, 
&eos)).IsCancelled()) {
@@ -489,7 +489,7 @@ class DataStreamTest : public testing::Test {
     VLOG_QUERY << "create sender " << sender_num;
     const TDataSink& sink = GetSink(partition_type);
     DataStreamSender sender(
-        sender_num, *row_desc_, sink.stream_sink, dest_, channel_buffer_size);
+        sender_num, row_desc_, sink.stream_sink, dest_, channel_buffer_size);
 
     TExprNode expr_node;
     expr_node.node_type = TExprNodeType::SLOT_REF;
@@ -607,7 +607,7 @@ TEST_F(DataStreamTest, CloseRecvrWhileReferencesRemain) {
   TUniqueId instance_id;
   GetNextInstanceId(&instance_id);
   shared_ptr<DataStreamRecvr> stream_recvr = 
stream_mgr_->CreateRecvr(runtime_state.get(),
-      *row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false);
+      row_desc_, instance_id, DEST_NODE_ID, 1, 1, profile.get(), false);
 
   // Perform tear down, but keep a reference to the receiver so that it is 
deleted last
   // (to confirm that the destructor does not access invalid state after 
tear-down).

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/descriptors.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index ad66c56..0f97f76 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -428,7 +428,7 @@ bool RowDescriptor::IsAnyTupleNullable() const {
   return false;
 }
 
-void RowDescriptor::ToThrift(vector<TTupleId>* row_tuple_ids) {
+void RowDescriptor::ToThrift(vector<TTupleId>* row_tuple_ids) const {
   row_tuple_ids->clear();
   for (int i = 0; i < tuple_desc_map_.size(); ++i) {
     row_tuple_ids->push_back(tuple_desc_map_[i]->id());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/descriptors.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 8a8dd7b..bdb8f8c 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -492,7 +492,9 @@ class DescriptorTbl {
       const HdfsTableDescriptor& hdfs_tbl, ObjectPool* pool) 
WARN_UNUSED_RESULT;
 };
 
-/// Records positions of tuples within row produced by ExecNode.
+/// Records positions of tuples within row produced by ExecNode. 
RowDescriptors are
+/// typically owned by their ExecNode, and shared by reference across the plan 
tree.
+///
 /// TODO: this needs to differentiate between tuples contained in row
 /// and tuples produced by ExecNode (parallel to PlanNode.rowTupleIds and
 /// PlanNode.tupleIds); right now, we conflate the two (and distinguish based 
on
@@ -544,7 +546,7 @@ class RowDescriptor {
   }
 
   /// Populate row_tuple_ids with our ids.
-  void ToThrift(std::vector<TTupleId>* row_tuple_ids);
+  void ToThrift(std::vector<TTupleId>* row_tuple_ids) const;
 
   /// Return true if the tuple ids of this descriptor are a prefix
   /// of the tuple ids of other_desc.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch-serialize-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-serialize-test.cc 
b/be/src/runtime/row-batch-serialize-test.cc
index f4c4f0b..7a084fc 100644
--- a/be/src/runtime/row-batch-serialize-test.cc
+++ b/be/src/runtime/row-batch-serialize-test.cc
@@ -68,7 +68,7 @@ class RowBatchSerializeTest : public testing::Test {
     TRowBatch trow_batch;
     EXPECT_OK(batch->Serialize(&trow_batch, full_dedup));
 
-    RowBatch deserialized_batch(row_desc, trow_batch, tracker_.get());
+    RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get());
     if (print_batches) cout << PrintBatch(&deserialized_batch) << endl;
 
     EXPECT_EQ(batch->num_rows(), deserialized_batch.num_rows());
@@ -198,7 +198,7 @@ class RowBatchSerializeTest : public testing::Test {
 
   // Creates a row batch with randomized values.
   RowBatch* CreateRowBatch(const RowDescriptor& row_desc) {
-    RowBatch* batch = pool_.Add(new RowBatch(row_desc, NUM_ROWS, 
tracker_.get()));
+    RowBatch* batch = pool_.Add(new RowBatch(&row_desc, NUM_ROWS, 
tracker_.get()));
     int len = row_desc.GetRowSize() * NUM_ROWS;
     uint8_t* tuple_mem = batch->tuple_data_pool()->Allocate(len);
     memset(tuple_mem, 0, len);
@@ -259,7 +259,7 @@ class RowBatchSerializeTest : public testing::Test {
   // order provided, starting at the beginning once all are used.
   void AddTuplesToRowBatch(int num_rows, const vector<vector<Tuple*>>& tuples,
       const vector<int>& repeats, RowBatch* batch) {
-    int tuples_per_row = batch->row_desc().tuple_descriptors().size();
+    int tuples_per_row = batch->row_desc()->tuple_descriptors().size();
     ASSERT_EQ(tuples_per_row, tuples.size());
     ASSERT_EQ(tuples_per_row, repeats.size());
     vector<int> next_tuple(tuples_per_row, 0);
@@ -447,7 +447,7 @@ void RowBatchSerializeTest::TestDupCorrectness(bool 
full_dedup) {
   repeats.push_back(1);
   // All string dups are consecutive
   repeats.push_back(num_rows / distinct_string_tuples + 1);
-  RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, 
tracker_.get()));
+  RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, 
tracker_.get()));
   vector<vector<Tuple*>> distinct_tuples(2);
   CreateTuples(*row_desc.tuple_descriptors()[0], batch->tuple_data_pool(),
       distinct_int_tuples, 0, 10, &distinct_tuples[0]);
@@ -481,7 +481,7 @@ void RowBatchSerializeTest::TestDupRemoval(bool full_dedup) 
{
   int num_distinct_tuples = 100;
   // All dups are consecutive
   int repeats = num_rows / num_distinct_tuples;
-  RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, 
tracker_.get()));
+  RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, 
tracker_.get()));
   vector<Tuple*> tuples;
   CreateTuples(tuple_desc, batch->tuple_data_pool(), num_distinct_tuples, 0, 
10, &tuples);
   AddTuplesToRowBatch(num_rows, tuples, repeats, batch);
@@ -517,7 +517,7 @@ void RowBatchSerializeTest::TestConsecutiveNulls(bool 
full_dedup) {
   int num_rows = 100;
   int num_distinct_tuples = 20;
   int repeats = 5;
-  RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, 
tracker_.get()));
+  RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, 
tracker_.get()));
   vector<Tuple*> tuples;
   CreateTuples(*row_desc.tuple_descriptors()[0], batch->tuple_data_pool(),
       num_distinct_tuples, 50, 10, &tuples);
@@ -587,7 +587,7 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) {
   vector<vector<Tuple*>> tuples(num_tuples);
   vector<int> repeats(num_tuples, 1); // Don't repeat tuples adjacently
   int64_t total_byte_size = 0;
-  RowBatch* batch = pool_.Add(new RowBatch(row_desc, num_rows, 
tracker_.get()));
+  RowBatch* batch = pool_.Add(new RowBatch(&row_desc, num_rows, 
tracker_.get()));
   // First two tuples are integers because it doesn't matter for this test.
   for (int tuple_idx = 0; tuple_idx < num_int_tuples; ++tuple_idx) {
     TupleDescriptor* tuple_desc = row_desc.tuple_descriptors()[tuple_idx];
@@ -630,7 +630,7 @@ TEST_F(RowBatchSerializeTest, DedupPathologicalFull) {
   // Serialized data should only have one copy of each tuple.
   EXPECT_EQ(total_byte_size, trow_batch.uncompressed_size);
   LOG(INFO) << "Deserializing row batch";
-  RowBatch deserialized_batch(row_desc, trow_batch, tracker_.get());
+  RowBatch deserialized_batch(&row_desc, trow_batch, tracker_.get());
   LOG(INFO) << "Verifying row batch";
   // Need to do special verification: comparing all duplicate strings is too 
slow.
   EXPECT_EQ(batch->num_rows(), deserialized_batch.num_rows());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch-test.cc b/be/src/runtime/row-batch-test.cc
index bbdcc2d..dc9c395 100644
--- a/be/src/runtime/row-batch-test.cc
+++ b/be/src/runtime/row-batch-test.cc
@@ -47,25 +47,25 @@ TEST(RowBatchTest, AcquireStateWithMarkFlushResources) {
   RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
   MemTracker tracker;
   {
-    RowBatch src(row_desc, 1024, &tracker);
+    RowBatch src(&row_desc, 1024, &tracker);
     src.AddRow();
     src.CommitLastRow();
     // Calls MarkFlushResources().
     src.MarkNeedsDeepCopy();
 
     // Note InitialCapacity(), not capacity(). Latter will DCHECK.
-    RowBatch dest(row_desc, src.InitialCapacity(), &tracker);
+    RowBatch dest(&row_desc, src.InitialCapacity(), &tracker);
     dest.AcquireState(&src);
   }
 
   // Confirm the bad pattern causes an error.
   {
-    RowBatch src(row_desc, 1024, &tracker);
+    RowBatch src(&row_desc, 1024, &tracker);
     src.AddRow();
     src.CommitLastRow();
     // Calls MarkFlushResources().
     src.MarkNeedsDeepCopy();
-    RowBatch bad_dest(row_desc, src.capacity(), &tracker);
+    RowBatch bad_dest(&row_desc, src.capacity(), &tracker);
     IMPALA_ASSERT_DEBUG_DEATH(bad_dest.AcquireState(&src), "");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index 7668063..9bb96aa 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -42,12 +42,12 @@ namespace impala {
 const int RowBatch::AT_CAPACITY_MEM_USAGE;
 const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
 
-RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* 
mem_tracker)
+RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* 
mem_tracker)
   : num_rows_(0),
     capacity_(capacity),
     flush_(FlushMode::NO_FLUSH_RESOURCES),
     needs_deep_copy_(false),
-    num_tuples_per_row_(row_desc.tuple_descriptors().size()),
+    num_tuples_per_row_(row_desc->tuple_descriptors().size()),
     auxiliary_mem_usage_(0),
     tuple_data_pool_(mem_tracker),
     row_desc_(row_desc),
@@ -73,7 +73,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int 
capacity, MemTracker* mem_
 // to allocated string data in special mempool
 // (change via python script that runs over Data_types.cc)
 RowBatch::RowBatch(
-    const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* 
mem_tracker)
+    const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* 
mem_tracker)
   : num_rows_(input_batch.num_rows),
     capacity_(input_batch.num_rows),
     flush_(FlushMode::NO_FLUSH_RESOURCES),
@@ -85,7 +85,7 @@ RowBatch::RowBatch(
     mem_tracker_(mem_tracker) {
   DCHECK(mem_tracker_ != NULL);
   tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * 
sizeof(Tuple*);
-  DCHECK_EQ(input_batch.row_tuples.size(), 
row_desc.tuple_descriptors().size());
+  DCHECK_EQ(input_batch.row_tuples.size(), 
row_desc->tuple_descriptors().size());
   DCHECK_GT(tuple_ptrs_size_, 0);
   // TODO: switch to Init() pattern so we can check memory limit and return 
Status.
   if (FLAGS_enable_partitioned_aggregation && 
FLAGS_enable_partitioned_hash_join) {
@@ -131,7 +131,7 @@ RowBatch::RowBatch(
   }
 
   // Check whether we have slots that require offset-to-pointer conversion.
-  if (!row_desc_.HasVarlenSlots()) return;
+  if (!row_desc_->HasVarlenSlots()) return;
 
   // For every unique tuple, convert string offsets contained in tuple data 
into
   // pointers. Tuples were serialized in the order we are deserializing them 
in,
@@ -140,7 +140,7 @@ RowBatch::RowBatch(
   Tuple* last_converted = NULL;
   for (int i = 0; i < num_rows_; ++i) {
     for (int j = 0; j < num_tuples_per_row_; ++j) {
-      const TupleDescriptor* desc = row_desc_.tuple_descriptors()[j];
+      const TupleDescriptor* desc = row_desc_->tuple_descriptors()[j];
       if (!desc->HasVarlenSlots()) continue;
       Tuple* tuple = GetRow(i)->GetTuple(j);
       // Handle NULL or already converted tuples with one check.
@@ -182,7 +182,7 @@ Status RowBatch::Serialize(TRowBatch* output_batch, bool 
full_dedup) {
   output_batch->compression_type = THdfsCompression::NONE;
 
   output_batch->num_rows = num_rows_;
-  row_desc_.ToThrift(&output_batch->row_tuples);
+  row_desc_->ToThrift(&output_batch->row_tuples);
 
   // As part of the serialization process we deduplicate tuples to avoid 
serializing a
   // Tuple multiple times for the RowBatch. By default we only detect 
duplicate tuples
@@ -232,10 +232,10 @@ bool RowBatch::UseFullDedup() {
   // be common: when a row contains tuples with collections and where there 
are three or
   // more tuples per row so non-adjacent duplicate tuples may have been 
created when
   // joining tuples from multiple sources into a single row.
-  if (row_desc_.tuple_descriptors().size() < 3) return false;
+  if (row_desc_->tuple_descriptors().size() < 3) return false;
   vector<TupleDescriptor*>::const_iterator tuple_desc =
-      row_desc_.tuple_descriptors().begin();
-  for (; tuple_desc != row_desc_.tuple_descriptors().end(); ++tuple_desc) {
+      row_desc_->tuple_descriptors().begin();
+  for (; tuple_desc != row_desc_->tuple_descriptors().end(); ++tuple_desc) {
     if (!(*tuple_desc)->collection_slots().empty()) return true;
   }
   return false;
@@ -260,8 +260,9 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* 
distinct_tuples,
   char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
 
   for (int i = 0; i < num_rows_; ++i) {
-    vector<TupleDescriptor*>::const_iterator desc = 
row_desc_.tuple_descriptors().begin();
-    for (int j = 0; desc != row_desc_.tuple_descriptors().end(); ++desc, ++j) {
+    vector<TupleDescriptor*>::const_iterator desc =
+        row_desc_->tuple_descriptors().begin();
+    for (int j = 0; desc != row_desc_->tuple_descriptors().end(); ++desc, ++j) 
{
       Tuple* tuple = GetRow(i)->GetTuple(j);
       if (UNLIKELY(tuple == NULL)) {
         // NULLs are encoded as -1
@@ -382,7 +383,7 @@ int RowBatch::GetBatchSize(const TRowBatch& batch) {
 }
 
 void RowBatch::AcquireState(RowBatch* src) {
-  DCHECK(row_desc_.LayoutEquals(src->row_desc_));
+  DCHECK(row_desc_->LayoutEquals(*src->row_desc_));
   DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
   DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
 
@@ -405,7 +406,7 @@ void RowBatch::AcquireState(RowBatch* src) {
 }
 
 void RowBatch::DeepCopyTo(RowBatch* dst) {
-  DCHECK(dst->row_desc_.Equals(row_desc_));
+  DCHECK(dst->row_desc_->Equals(*row_desc_));
   DCHECK_EQ(dst->num_rows_, 0);
   DCHECK_GE(dst->capacity_, num_rows_);
   dst->AddRows(num_rows_);
@@ -413,8 +414,8 @@ void RowBatch::DeepCopyTo(RowBatch* dst) {
     TupleRow* src_row = GetRow(i);
     TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->tuple_ptrs_ +
         i * num_tuples_per_row_);
-    src_row->DeepCopy(dst_row, row_desc_.tuple_descriptors(), 
&dst->tuple_data_pool_,
-        false);
+    src_row->DeepCopy(
+        dst_row, row_desc_->tuple_descriptors(), &dst->tuple_data_pool_, 
false);
   }
   dst->CommitRows(num_rows_);
 }
@@ -423,7 +424,7 @@ void RowBatch::DeepCopyTo(RowBatch* dst) {
 int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
   DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0);
   int64_t result = 0;
-  vector<int> tuple_count(row_desc_.tuple_descriptors().size(), 0);
+  vector<int> tuple_count(row_desc_->tuple_descriptors().size(), 0);
 
   // Sum total variable length byte sizes.
   for (int i = 0; i < num_rows_; ++i) {
@@ -435,17 +436,17 @@ int64_t RowBatch::TotalByteSize(DedupMap* 
distinct_tuples) {
         // Fast tuple deduplication for adjacent rows.
         continue;
       } else if (UNLIKELY(distinct_tuples != NULL)) {
-        if (row_desc_.tuple_descriptors()[j]->byte_size() == 0) continue;
+        if (row_desc_->tuple_descriptors()[j]->byte_size() == 0) continue;
         bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1);
         if (!inserted) continue;
       }
-      result += tuple->VarlenByteSize(*row_desc_.tuple_descriptors()[j]);
+      result += tuple->VarlenByteSize(*row_desc_->tuple_descriptors()[j]);
       ++tuple_count[j];
     }
   }
   // Compute sum of fixed component of tuple sizes.
   for (int j = 0; j < num_tuples_per_row_; ++j) {
-    result += row_desc_.tuple_descriptors()[j]->byte_size() * tuple_count[j];
+    result += row_desc_->tuple_descriptors()[j]->byte_size() * tuple_count[j];
   }
   return result;
 }
@@ -453,7 +454,7 @@ int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
 Status RowBatch::ResizeAndAllocateTupleBuffer(
     RuntimeState* state, int64_t* buffer_size, uint8_t** buffer) {
   return ResizeAndAllocateTupleBuffer(
-      state, &tuple_data_pool_, row_desc_.GetRowSize(), &capacity_, 
buffer_size, buffer);
+      state, &tuple_data_pool_, row_desc_->GetRowSize(), &capacity_, 
buffer_size, buffer);
 }
 
 Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* 
pool,
@@ -475,7 +476,7 @@ void RowBatch::VLogRows(const string& context) {
   if (!VLOG_ROW_IS_ON) return;
   VLOG_ROW << context << ": #rows=" << num_rows_;
   for (int i = 0; i < num_rows_; ++i) {
-    VLOG_ROW << PrintRow(GetRow(i), row_desc_);
+    VLOG_ROW << PrintRow(GetRow(i), *row_desc_);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 0068478..3cf1093 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -85,15 +85,15 @@ class RowBatch {
   /// Create RowBatch for a maximum of 'capacity' rows of tuples specified
   /// by 'row_desc'.
   /// tracker cannot be NULL.
-  RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* tracker);
+  RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* tracker);
 
   /// Populate a row batch from input_batch by copying input_batch's
   /// tuple_data into the row batch's mempool and converting all offsets
   /// in the data back into pointers.
   /// TODO: figure out how to transfer the data from input_batch to this 
RowBatch
   /// (so that we don't need to make yet another copy)
-  RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
-      MemTracker* tracker);
+  RowBatch(
+      const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* 
tracker);
 
   /// Releases all resources accumulated at this row batch.  This includes
   ///  - tuple_ptrs
@@ -321,7 +321,7 @@ class RowBatch {
     return tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
   }
 
-  const RowDescriptor& row_desc() const { return row_desc_; }
+  const RowDescriptor* row_desc() const { return row_desc_; }
 
   /// Max memory that this row batch can accumulate before it is considered at 
capacity.
   /// This is a soft capacity: row batches may exceed the capacity, preferably 
only by a
@@ -424,8 +424,9 @@ class RowBatch {
   // Less frequently used members that are not accessed on 
performance-critical paths
   // should go below here.
 
-  /// Full row descriptor for rows in this batch.
-  RowDescriptor row_desc_;
+  /// Full row descriptor for rows in this batch. Owned by the exec node that 
produced
+  /// this batch.
+  const RowDescriptor* row_desc_;
 
   MemTracker* mem_tracker_;  // not owned
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorted-run-merger.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.cc 
b/be/src/runtime/sorted-run-merger.cc
index 36d7a61..673d6c3 100644
--- a/be/src/runtime/sorted-run-merger.cc
+++ b/be/src/runtime/sorted-run-merger.cc
@@ -126,7 +126,7 @@ void SortedRunMerger::Heapify(int parent_index) {
 }
 
 SortedRunMerger::SortedRunMerger(const TupleRowComparator& comparator,
-    RowDescriptor* row_desc, RuntimeProfile* profile, bool deep_copy_input)
+    const RowDescriptor* row_desc, RuntimeProfile* profile, bool 
deep_copy_input)
   : comparator_(comparator),
     input_row_desc_(row_desc),
     deep_copy_input_(deep_copy_input) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorted-run-merger.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorted-run-merger.h 
b/be/src/runtime/sorted-run-merger.h
index 563b6f7..99ffbe9 100644
--- a/be/src/runtime/sorted-run-merger.h
+++ b/be/src/runtime/sorted-run-merger.h
@@ -55,7 +55,7 @@ class SortedRunMerger {
   /// zero).
   typedef boost::function<Status (RowBatch**)> RunBatchSupplierFn;
 
-  SortedRunMerger(const TupleRowComparator& comparator, RowDescriptor* 
row_desc,
+  SortedRunMerger(const TupleRowComparator& comparator, const RowDescriptor* 
row_desc,
       RuntimeProfile* profile, bool deep_copy_input);
 
   /// Prepare this merger to merge and return rows from the sorted runs in 
'input_runs'.
@@ -96,7 +96,7 @@ class SortedRunMerger {
 
   /// Descriptor for the rows provided by the input runs. Owned by the 
exec-node through
   /// which this merger was created.
-  RowDescriptor* input_row_desc_;
+  const RowDescriptor* input_row_desc_;
 
   /// True if rows must be deep copied into the output batch.
   bool deep_copy_input_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/runtime/sorter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 4c33d7f..b8182e2 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -538,8 +538,8 @@ Status Sorter::Run::AddBatchInternal(RowBatch* batch, int 
start_index, int* num_
 
   if (!INITIAL_RUN) {
     // For intermediate merges, the input row is the sort tuple.
-    DCHECK_EQ(batch->row_desc().tuple_descriptors().size(), 1);
-    DCHECK_EQ(batch->row_desc().tuple_descriptors()[0], sort_tuple_desc_);
+    DCHECK_EQ(batch->row_desc()->tuple_descriptors().size(), 1);
+    DCHECK_EQ(batch->row_desc()->tuple_descriptors()[0], sort_tuple_desc_);
   }
 
   /// Keep initial unsorted runs pinned in memory so we can sort them.
@@ -756,8 +756,8 @@ Status Sorter::Run::PrepareRead(bool* pinned_all_blocks) {
   end_of_fixed_len_block_ = end_of_var_len_block_ = fixed_len_blocks_.empty();
   num_tuples_returned_ = 0;
 
-  buffered_batch_.reset(new RowBatch(*sorter_->output_row_desc_,
-      sorter_->state_->batch_size(), sorter_->mem_tracker_));
+  buffered_batch_.reset(new RowBatch(
+      sorter_->output_row_desc_, sorter_->state_->batch_size(), 
sorter_->mem_tracker_));
 
   // If the run is pinned, all blocks are already pinned, so we're ready to 
read.
   if (is_pinned_) {
@@ -1587,8 +1587,7 @@ Status Sorter::CreateMerger(int max_num_runs) {
 }
 
 Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) {
-  RowBatch intermediate_merge_batch(*output_row_desc_, state_->batch_size(),
-      mem_tracker_);
+  RowBatch intermediate_merge_batch(output_row_desc_, state_->batch_size(), 
mem_tracker_);
   bool eos = false;
   while (!eos) {
     // Copy rows into the new run until done.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/317c413a/be/src/util/debug-util.cc
----------------------------------------------------------------------
diff --git a/be/src/util/debug-util.cc b/be/src/util/debug-util.cc
index d04550c..6a94496 100644
--- a/be/src/util/debug-util.cc
+++ b/be/src/util/debug-util.cc
@@ -224,7 +224,7 @@ string PrintRow(TupleRow* row, const RowDescriptor& d) {
 string PrintBatch(RowBatch* batch) {
   stringstream out;
   for (int i = 0; i < batch->num_rows(); ++i) {
-    out << PrintRow(batch->GetRow(i), batch->row_desc()) << "\n";
+    out << PrintRow(batch->GetRow(i), *batch->row_desc()) << "\n";
   }
   return out.str();
 }

Reply via email to