http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc 
b/be/src/runtime/buffered-tuple-stream-v2.cc
deleted file mode 100644
index 2153264..0000000
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ /dev/null
@@ -1,1084 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/buffered-tuple-stream-v2.inline.h"
-
-#include <boost/bind.hpp>
-#include <gutil/strings/substitute.h>
-
-#include "runtime/bufferpool/reservation-tracker.h"
-#include "runtime/collection-value.h"
-#include "runtime/descriptors.h"
-#include "runtime/exec-env.h"
-#include "runtime/mem-tracker.h"
-#include "runtime/row-batch.h"
-#include "runtime/runtime-state.h"
-#include "runtime/string-value.h"
-#include "runtime/tuple-row.h"
-#include "util/bit-util.h"
-#include "util/debug-util.h"
-#include "util/runtime-profile-counters.h"
-
-#include "common/names.h"
-
-#ifdef NDEBUG
-#define CHECK_CONSISTENCY_FAST()
-#define CHECK_CONSISTENCY_FULL()
-#else
-#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast()
-#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull()
-#endif
-
-using namespace impala;
-using namespace strings;
-
-using BufferHandle = BufferPool::BufferHandle;
-
-BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
-    const RowDescriptor* row_desc, BufferPool::ClientHandle* 
buffer_pool_client,
-    int64_t default_page_len, int64_t max_page_len, const set<SlotId>& 
ext_varlen_slots)
-  : state_(state),
-    desc_(row_desc),
-    node_id_(-1),
-    buffer_pool_(state->exec_env()->buffer_pool()),
-    buffer_pool_client_(buffer_pool_client),
-    num_pages_(0),
-    total_byte_size_(0),
-    has_read_iterator_(false),
-    read_page_reservation_(buffer_pool_client_),
-    read_page_rows_returned_(-1),
-    read_ptr_(nullptr),
-    read_end_ptr_(nullptr),
-    write_ptr_(nullptr),
-    write_end_ptr_(nullptr),
-    rows_returned_(0),
-    has_write_iterator_(false),
-    write_page_(nullptr),
-    write_page_reservation_(buffer_pool_client_),
-    bytes_pinned_(0),
-    num_rows_(0),
-    default_page_len_(default_page_len),
-    max_page_len_(max_page_len),
-    has_nullable_tuple_(row_desc->IsAnyTupleNullable()),
-    delete_on_read_(false),
-    closed_(false),
-    pinned_(true) {
-  DCHECK_GE(max_page_len, default_page_len);
-  DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len;
-  DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len;
-  read_page_ = pages_.end();
-  for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) {
-    const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i];
-    const int tuple_byte_size = tuple_desc->byte_size();
-    fixed_tuple_sizes_.push_back(tuple_byte_size);
-
-    vector<SlotDescriptor*> tuple_string_slots;
-    vector<SlotDescriptor*> tuple_coll_slots;
-    for (int j = 0; j < tuple_desc->slots().size(); ++j) {
-      SlotDescriptor* slot = tuple_desc->slots()[j];
-      if (!slot->type().IsVarLenType()) continue;
-      if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) {
-        if (slot->type().IsVarLenStringType()) {
-          tuple_string_slots.push_back(slot);
-        } else {
-          DCHECK(slot->type().IsCollectionType());
-          tuple_coll_slots.push_back(slot);
-        }
-      }
-    }
-    if (!tuple_string_slots.empty()) {
-      inlined_string_slots_.push_back(make_pair(i, tuple_string_slots));
-    }
-
-    if (!tuple_coll_slots.empty()) {
-      inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots));
-    }
-  }
-}
-
-BufferedTupleStreamV2::~BufferedTupleStreamV2() {
-  DCHECK(closed_);
-}
-
-void BufferedTupleStreamV2::CheckConsistencyFull() const {
-  CheckConsistencyFast();
-  // The below checks require iterating over all the pages in the stream.
-  DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString();
-  DCHECK_EQ(pages_.size(), num_pages_) << DebugString();
-  for (const Page& page : pages_) CheckPageConsistency(&page);
-}
-
-void BufferedTupleStreamV2::CheckConsistencyFast() const {
-  // All the below checks should be O(1).
-  DCHECK(has_write_iterator() || write_page_ == nullptr);
-  if (write_page_ != nullptr) {
-    CheckPageConsistency(write_page_);
-    DCHECK(write_page_->is_pinned());
-    DCHECK(write_page_->retrieved_buffer);
-    const BufferHandle* write_buffer;
-    Status status = write_page_->GetBuffer(&write_buffer);
-    DCHECK(status.ok()); // Write buffer should never have been unpinned.
-    DCHECK_GE(write_ptr_, write_buffer->data());
-    DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len());
-    DCHECK_GE(write_end_ptr_, write_ptr_);
-  }
-  DCHECK(has_read_iterator() || read_page_ == pages_.end());
-  if (read_page_ != pages_.end()) {
-    CheckPageConsistency(&*read_page_);
-    DCHECK(read_page_->is_pinned());
-    DCHECK(read_page_->retrieved_buffer);
-    // Can't check read buffer without affecting behaviour, because a read may 
be in
-    // flight and this would required blocking on that write.
-    DCHECK_GE(read_end_ptr_, read_ptr_);
-  }
-  if (NeedReadReservation()) {
-    DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation())
-        << DebugString();
-  } else if (!read_page_reservation_.is_closed()) {
-    DCHECK_EQ(0, read_page_reservation_.GetReservation());
-  }
-  if (NeedWriteReservation()) {
-    DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation());
-  } else if (!write_page_reservation_.is_closed()) {
-    DCHECK_EQ(0, write_page_reservation_.GetReservation());
-  }
-}
-
-void BufferedTupleStreamV2::CheckPageConsistency(const Page* page) const {
-  DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << 
DebugString();
-  // Only one large row per page.
-  if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1);
-  // We only create pages when we have a row to append to them.
-  DCHECK_GT(page->num_rows, 0);
-}
-
-string BufferedTupleStreamV2::DebugString() const {
-  stringstream ss;
-  ss << "BufferedTupleStreamV2 num_rows=" << num_rows_
-     << " rows_returned=" << rows_returned_ << " pinned=" << pinned_
-     << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n"
-     << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << 
has_write_iterator_
-     << " write_page=" << write_page_ << " has_read_iterator=" << 
has_read_iterator_
-     << " read_page=";
-  if (read_page_ == pages_.end()) {
-    ss << "<end>";
-  } else {
-    ss << &*read_page_;
-  }
-  ss << "\n"
-     << " read_page_reservation=";
-  if (read_page_reservation_.is_closed()) {
-    ss << "<closed>";
-  } else {
-    ss << read_page_reservation_.GetReservation();
-  }
-  ss << " write_page_reservation=";
-  if (write_page_reservation_.is_closed()) {
-    ss << "<closed>";
-  } else {
-    ss << write_page_reservation_.GetReservation();
-  }
-  ss << "\n # pages=" << num_pages_ << " pages=[\n";
-  for (const Page& page : pages_) {
-    ss << "{" << page.DebugString() << "}";
-    if (&page != &pages_.back()) ss << ",\n";
-  }
-  ss << "]";
-  return ss.str();
-}
-
-string BufferedTupleStreamV2::Page::DebugString() const {
-  return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows);
-}
-
-Status BufferedTupleStreamV2::Init(int node_id, bool pinned) {
-  if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
-  node_id_ = node_id;
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) {
-  // This must be the first iterator created.
-  DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
-  DCHECK(!has_write_iterator());
-  DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY_FULL();
-
-  *got_reservation = 
buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
-  if (!*got_reservation) return Status::OK();
-  has_write_iterator_ = true;
-  // Save reservation for the write iterators.
-  buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-  CHECK_CONSISTENCY_FULL();
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::PrepareForReadWrite(
-    bool delete_on_read, bool* got_reservation) {
-  // This must be the first iterator created.
-  DCHECK(pages_.empty());
-  DCHECK(!delete_on_read_);
-  DCHECK(!has_write_iterator());
-  DCHECK(!has_read_iterator());
-  CHECK_CONSISTENCY_FULL();
-
-  *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * 
default_page_len_);
-  if (!*got_reservation) return Status::OK();
-  has_write_iterator_ = true;
-  // Save reservation for both the read and write iterators.
-  buffer_pool_client_->SaveReservation(&read_page_reservation_, 
default_page_len_);
-  buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-  RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read));
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
-  for (Page& page : pages_) {
-    if (batch != nullptr && page.retrieved_buffer) {
-      // Subtle: We only need to attach buffers from pages that we may have 
returned
-      // references to. ExtractBuffer() cannot fail for these pages because 
the data
-      // is guaranteed to already be in -memory.
-      BufferPool::BufferHandle buffer;
-      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, 
&page.handle, &buffer);
-      DCHECK(status.ok());
-      batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
-    } else {
-      buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
-    }
-  }
-  read_page_reservation_.Close();
-  write_page_reservation_.Close();
-  pages_.clear();
-  num_pages_ = 0;
-  bytes_pinned_ = 0;
-  closed_ = true;
-}
-
-int64_t BufferedTupleStreamV2::CalcBytesPinned() const {
-  int64_t result = 0;
-  for (const Page& page : pages_) result += page.pin_count() * page.len();
-  return result;
-}
-
-Status BufferedTupleStreamV2::PinPage(Page* page) {
-  RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle));
-  bytes_pinned_ += page->len();
-  return Status::OK();
-}
-
-int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* 
page) const {
-  return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0;
-}
-
-Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) {
-  int new_pin_count = ExpectedPinCount(stream_pinned, page);
-  if (new_pin_count != page->pin_count()) {
-    DCHECK_EQ(new_pin_count, page->pin_count() + 1);
-    RETURN_IF_ERROR(PinPage(page));
-  }
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
-  int new_pin_count = ExpectedPinCount(stream_pinned, page);
-  if (new_pin_count != page->pin_count()) {
-    DCHECK_EQ(new_pin_count, page->pin_count() - 1);
-    buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
-    bytes_pinned_ -= page->len();
-    if (page->pin_count() == 0) page->retrieved_buffer = false;
-  }
-}
-
-bool BufferedTupleStreamV2::NeedWriteReservation() const {
-  return NeedWriteReservation(pinned_);
-}
-
-bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned) const {
-  return NeedWriteReservation(stream_pinned, num_pages_, has_write_iterator(),
-      write_page_ != nullptr, has_read_write_page());
-}
-
-bool BufferedTupleStreamV2::NeedWriteReservation(bool stream_pinned, int64_t 
num_pages,
-    bool has_write_iterator, bool has_write_page, bool has_read_write_page) {
-  if (!has_write_iterator) return false;
-  // If the stream is empty the write reservation hasn't been used yet.
-  if (num_pages == 0) return true;
-  if (stream_pinned) {
-    // Make sure we've saved the write reservation for the next page if the 
only
-    // page is a read/write page.
-    return has_read_write_page && num_pages == 1;
-  } else {
-    // Make sure we've saved the write reservation if it's not being used to 
pin
-    // a page in the stream.
-    return !has_write_page || has_read_write_page;
-  }
-}
-
-bool BufferedTupleStreamV2::NeedReadReservation() const {
-  return NeedReadReservation(pinned_);
-}
-
-bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned) const {
-  return NeedReadReservation(
-      stream_pinned, num_pages_, has_read_iterator(), read_page_ != 
pages_.end());
-}
-
-bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t 
num_pages,
-    bool has_read_iterator, bool has_read_page) const {
-  return NeedReadReservation(stream_pinned, num_pages, has_read_iterator, 
has_read_page,
-      has_write_iterator(), write_page_ != nullptr);
-}
-
-bool BufferedTupleStreamV2::NeedReadReservation(bool stream_pinned, int64_t 
num_pages,
-    bool has_read_iterator, bool has_read_page, bool has_write_iterator,
-    bool has_write_page) {
-  if (!has_read_iterator) return false;
-  if (stream_pinned) {
-    // Need reservation if there are no pages currently pinned for reading but 
we may add
-    // a page.
-    return num_pages == 0 && has_write_iterator;
-  } else {
-    // Only need to save reservation for an unpinned stream if there is no 
read page
-    // and we may advance to one in the future.
-    return (has_write_iterator || num_pages > 0) && !has_read_page;
-  }
-}
-
-Status BufferedTupleStreamV2::NewWritePage(int64_t page_len) noexcept {
-  DCHECK(!closed_);
-  DCHECK(write_page_ == nullptr);
-
-  Page new_page;
-  const BufferHandle* write_buffer;
-  RETURN_IF_ERROR(buffer_pool_->CreatePage(
-      buffer_pool_client_, page_len, &new_page.handle, &write_buffer));
-  bytes_pinned_ += page_len;
-  total_byte_size_ += page_len;
-
-  pages_.push_back(std::move(new_page));
-  ++num_pages_;
-  write_page_ = &pages_.back();
-  DCHECK_EQ(write_page_->num_rows, 0);
-  write_ptr_ = write_buffer->data();
-  write_end_ptr_ = write_ptr_ + page_len;
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::CalcPageLenForRow(int64_t row_size, int64_t* 
page_len) {
-  if (UNLIKELY(row_size > max_page_len_)) {
-    return Status(TErrorCode::MAX_ROW_SIZE,
-        PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_,
-        PrettyPrinter::Print(0, TUnit::BYTES));
-  }
-  *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size));
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::AdvanceWritePage(
-    int64_t row_size, bool* got_reservation) noexcept {
-  DCHECK(has_write_iterator());
-  CHECK_CONSISTENCY_FAST();
-
-  int64_t page_len;
-  RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len));
-
-  // Reservation may have been saved for the next write page, e.g. by 
PrepareForWrite()
-  // if the stream is empty.
-  int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0;
-  if (NeedWriteReservation(
-          pinned_, num_pages_, true, write_page_ != nullptr, 
has_read_write_page())
-      && !NeedWriteReservation(pinned_, num_pages_ + 1, true, true, false)) {
-    write_reservation_to_restore = default_page_len_;
-  }
-  // If the stream is pinned, we need to keep the previous write page pinned 
for reading.
-  // Check if we saved reservation for this case.
-  if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-          read_page_ != pages_.end(), true, write_page_ != nullptr)
-      && !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(),
-             read_page_ != pages_.end(), true, true)) {
-    read_reservation_to_restore = default_page_len_;
-  }
-
-  // We may reclaim reservation by unpinning a page that was pinned for 
writing.
-  int64_t write_page_reservation_to_reclaim =
-      (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ?
-      write_page_->len() : 0;
-  // Check to see if we can get the reservation before changing the state of 
the stream.
-  if (!buffer_pool_client_->IncreaseReservationToFit(page_len
-          - write_reservation_to_restore - read_reservation_to_restore
-          - write_page_reservation_to_reclaim)) {
-    DCHECK(pinned_ || page_len > default_page_len_)
-        << "If the stream is unpinned, this should only fail for large pages";
-    CHECK_CONSISTENCY_FAST();
-    *got_reservation = false;
-    return Status::OK();
-  }
-  if (write_reservation_to_restore > 0) {
-    buffer_pool_client_->RestoreReservation(
-        &write_page_reservation_, write_reservation_to_restore);
-  }
-  if (read_reservation_to_restore > 0) {
-    buffer_pool_client_->RestoreReservation(
-        &read_page_reservation_, read_reservation_to_restore);
-  }
-  ResetWritePage();
-  RETURN_IF_ERROR(NewWritePage(page_len));
-  *got_reservation = true;
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::ResetWritePage() {
-  if (write_page_ == nullptr) return;
-  // Unpin the write page if we're reading in unpinned mode.
-  Page* prev_write_page = write_page_;
-  write_page_ = nullptr;
-  write_ptr_ = nullptr;
-  write_end_ptr_ = nullptr;
-
-  // May need to decrement pin count now that it's not the write page, 
depending on
-  // the stream's mode.
-  UnpinPageIfNeeded(prev_write_page, pinned_);
-}
-
-void BufferedTupleStreamV2::InvalidateWriteIterator() {
-  if (!has_write_iterator()) return;
-  ResetWritePage();
-  has_write_iterator_ = false;
-  // No more pages will be appended to stream - do not need any write 
reservation.
-  write_page_reservation_.Close();
-  // May not need a read reservation once the write iterator is invalidated.
-  if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-          read_page_ != pages_.end(), true, write_page_ != nullptr)
-      && !NeedReadReservation(pinned_, num_pages_, has_read_iterator(),
-             read_page_ != pages_.end(), false, false)) {
-    buffer_pool_client_->RestoreReservation(&read_page_reservation_, 
default_page_len_);
-  }
-}
-
-Status BufferedTupleStreamV2::NextReadPage() {
-  DCHECK(has_read_iterator());
-  DCHECK(!closed_);
-  CHECK_CONSISTENCY_FAST();
-
-  if (read_page_ == pages_.end()) {
-    // No rows read yet - start reading at first page. If the stream is 
unpinned, we can
-    // use the reservation saved in PrepareForReadWrite() to pin the first 
page.
-    read_page_ = pages_.begin();
-    if (NeedReadReservation(pinned_, num_pages_, true, false)
-        && !NeedReadReservation(pinned_, num_pages_, true, true)) {
-      buffer_pool_client_->RestoreReservation(&read_page_reservation_, 
default_page_len_);
-    }
-  } else if (delete_on_read_) {
-    DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " "
-                                         << DebugString();
-    DCHECK_NE(&*read_page_, write_page_);
-    bytes_pinned_ -= pages_.front().len();
-    buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle);
-    pages_.pop_front();
-    --num_pages_;
-    read_page_ = pages_.begin();
-  } else {
-    // Unpin pages after reading them if needed.
-    Page* prev_read_page = &*read_page_;
-    ++read_page_;
-    UnpinPageIfNeeded(prev_read_page, pinned_);
-  }
-
-  if (read_page_ == pages_.end()) {
-    CHECK_CONSISTENCY_FULL();
-    return Status::OK();
-  }
-
-  if (!pinned_ && read_page_->len() > default_page_len_
-      && buffer_pool_client_->GetUnusedReservation() < read_page_->len()) {
-    // If we are iterating over an unpinned stream and encounter a page that 
is larger
-    // than the default page length, then unpinning the previous page may not 
have
-    // freed up enough reservation to pin the next one. The client is 
responsible for
-    // ensuring the reservation is available, so this indicates a bug.
-    return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: 
couldn't pin "
-          "large page of $0 bytes, client only had $1 bytes of unused 
reservation:\n$2",
-          read_page_->len(), buffer_pool_client_->GetUnusedReservation(),
-          buffer_pool_client_->DebugString()));
-  }
-  // Ensure the next page is pinned for reading. By this point we should have 
enough
-  // reservation to pin the page. If the stream is pinned, the page is already 
pinned.
-  // If the stream is unpinned, we freed up enough memory for a default-sized 
page by
-  // deleting or unpinning the previous page and ensured that, if the page was 
larger,
-  // that the reservation is available with the above check.
-  RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
-
-  // This waits for the pin to complete if the page was unpinned earlier.
-  const BufferHandle* read_buffer;
-  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
-
-  read_page_rows_returned_ = 0;
-  read_ptr_ = read_buffer->data();
-  read_end_ptr_ = read_ptr_ + read_buffer->len();
-
-  // We may need to save reservation for the write page in the case when the 
write page
-  // became a read/write page.
-  if (!NeedWriteReservation(pinned_, num_pages_, has_write_iterator(),
-             write_page_ != nullptr, false)
-      && NeedWriteReservation(pinned_, num_pages_, has_write_iterator(),
-             write_page_ != nullptr, has_read_write_page())) {
-    buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-  }
-  CHECK_CONSISTENCY_FAST();
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::InvalidateReadIterator() {
-  if (read_page_ != pages_.end()) {
-    // Unpin the write page if we're reading in unpinned mode.
-    Page* prev_read_page = &*read_page_;
-    read_page_ = pages_.end();
-    read_ptr_ = nullptr;
-    read_end_ptr_ = nullptr;
-
-    // May need to decrement pin count after destroying read iterator.
-    UnpinPageIfNeeded(prev_read_page, pinned_);
-  }
-  has_read_iterator_ = false;
-  if (read_page_reservation_.GetReservation() > 0) {
-    buffer_pool_client_->RestoreReservation(&read_page_reservation_, 
default_page_len_);
-  }
-  // It is safe to re-read a delete-on-read stream if no rows were read and no 
pages
-  // were therefore deleted.
-  if (rows_returned_ == 0) delete_on_read_ = false;
-}
-
-Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* 
got_reservation) {
-  CHECK_CONSISTENCY_FULL();
-  InvalidateWriteIterator();
-  InvalidateReadIterator();
-  // If already pinned, no additional pin is needed (see ExpectedPinCount()).
-  *got_reservation = pinned_ || pages_.empty()
-      || buffer_pool_client_->IncreaseReservationToFit(default_page_len_);
-  if (!*got_reservation) return Status::OK();
-  return PrepareForReadInternal(delete_on_read);
-}
-
-Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
-  DCHECK(!closed_);
-  DCHECK(!delete_on_read_);
-  DCHECK(!has_read_iterator());
-
-  has_read_iterator_ = true;
-  if (pages_.empty()) {
-    // No rows to return, or a the first read/write page has not yet been 
allocated.
-    read_page_ = pages_.end();
-    read_ptr_ = nullptr;
-    read_end_ptr_ = nullptr;
-  } else {
-    // Eagerly pin the first page in the stream.
-    read_page_ = pages_.begin();
-    // Check if we need to increment the pin count of the read page.
-    RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
-    DCHECK(read_page_->is_pinned());
-
-    // This waits for the pin to complete if the page was unpinned earlier.
-    const BufferHandle* read_buffer;
-    RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
-    read_ptr_ = read_buffer->data();
-    read_end_ptr_ = read_ptr_ + read_buffer->len();
-  }
-  read_page_rows_returned_ = 0;
-  rows_returned_ = 0;
-  delete_on_read_ = delete_on_read;
-  CHECK_CONSISTENCY_FULL();
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::PinStream(bool* pinned) {
-  DCHECK(!closed_);
-  CHECK_CONSISTENCY_FULL();
-  if (pinned_) {
-    *pinned = true;
-    return Status::OK();
-  }
-  *pinned = false;
-  // First, make sure we have the reservation to pin all the pages for reading.
-  int64_t bytes_to_pin = 0;
-  for (Page& page : pages_) {
-    bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * 
page.len();
-  }
-
-  // Check if we have some reservation to restore.
-  bool restore_write_reservation =
-      NeedWriteReservation(false) && !NeedWriteReservation(true);
-  bool restore_read_reservation =
-      NeedReadReservation(false) && !NeedReadReservation(true);
-  int64_t increase_needed = bytes_to_pin
-      - (restore_write_reservation ? default_page_len_ : 0)
-      - (restore_read_reservation ? default_page_len_ : 0);
-  bool reservation_granted =
-      buffer_pool_client_->IncreaseReservationToFit(increase_needed);
-  if (!reservation_granted) return Status::OK();
-
-  // If there is no current write page we should have some saved reservation 
to use.
-  // Only continue saving it if the stream is empty and need it to pin the 
first page.
-  if (restore_write_reservation) {
-    buffer_pool_client_->RestoreReservation(&write_page_reservation_, 
default_page_len_);
-  }
-  if (restore_read_reservation) {
-    buffer_pool_client_->RestoreReservation(&read_page_reservation_, 
default_page_len_);
-  }
-
-  // At this point success is guaranteed - go through to pin the pages we need 
to pin.
-  // If the page data was evicted from memory, the read I/O can happen in 
parallel
-  // because we defer calling GetBuffer() until NextReadPage().
-  for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true));
-
-  pinned_ = true;
-  *pinned = true;
-  CHECK_CONSISTENCY_FULL();
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) {
-  CHECK_CONSISTENCY_FULL();
-  DCHECK(!closed_);
-  if (mode == UNPIN_ALL) {
-    // Invalidate the iterators so they don't keep pages pinned.
-    InvalidateWriteIterator();
-    InvalidateReadIterator();
-  }
-
-  if (pinned_) {
-    CHECK_CONSISTENCY_FULL();
-    // If the stream was pinned, there may be some remaining pinned pages that 
should
-    // be unpinned at this point.
-    for (Page& page : pages_) UnpinPageIfNeeded(&page, false);
-
-    // Check to see if we need to save some of the reservation we freed up.
-    if (!NeedWriteReservation(true) && NeedWriteReservation(false)) {
-      buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-    }
-    if (!NeedReadReservation(true) && NeedReadReservation(false)) {
-      buffer_pool_client_->SaveReservation(&read_page_reservation_, 
default_page_len_);
-    }
-    pinned_ = false;
-  }
-  CHECK_CONSISTENCY_FULL();
-}
-
-Status BufferedTupleStreamV2::GetRows(
-    MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) {
-  if (num_rows() > numeric_limits<int>::max()) {
-    // RowBatch::num_rows_ is a 32-bit int, avoid an overflow.
-    return Status(Substitute("Trying to read $0 rows into in-memory batch 
failed. Limit "
-                             "is $1",
-        num_rows(), numeric_limits<int>::max()));
-  }
-  RETURN_IF_ERROR(PinStream(got_rows));
-  if (!*got_rows) return Status::OK();
-  bool got_reservation;
-  RETURN_IF_ERROR(PrepareForRead(false, &got_reservation));
-  DCHECK(got_reservation) << "Stream was pinned";
-  batch->reset(new RowBatch(desc_, num_rows(), tracker));
-  bool eos = false;
-  // Loop until GetNext fills the entire batch. Each call can stop at page
-  // boundaries. We generally want it to stop, so that pages can be freed
-  // as we read. It is safe in this case because we pin the entire stream.
-  while (!eos) {
-    RETURN_IF_ERROR(GetNext(batch->get(), &eos));
-  }
-  return Status::OK();
-}
-
-Status BufferedTupleStreamV2::GetNext(RowBatch* batch, bool* eos) {
-  return GetNextInternal<false>(batch, eos, nullptr);
-}
-
-Status BufferedTupleStreamV2::GetNext(
-    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
-  return GetNextInternal<true>(batch, eos, flat_rows);
-}
-
-template <bool FILL_FLAT_ROWS>
-Status BufferedTupleStreamV2::GetNextInternal(
-    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
-  if (has_nullable_tuple_) {
-    return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows);
-  } else {
-    return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows);
-  }
-}
-
-template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
-Status BufferedTupleStreamV2::GetNextInternal(
-    RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) {
-  DCHECK(!closed_);
-  DCHECK(batch->row_desc()->Equals(*desc_));
-  DCHECK(is_pinned() || !FILL_FLAT_ROWS)
-      << "FlatRowPtrs are only valid for pinned streams";
-  *eos = (rows_returned_ == num_rows_);
-  if (*eos) return Status::OK();
-
-  if (UNLIKELY(read_page_ == pages_.end()
-          || read_page_rows_returned_ == read_page_->num_rows)) {
-    // Get the next page in the stream (or the first page if read_page_ was 
not yet
-    // initialized.) We need to do this at the beginning of the GetNext() call 
to ensure
-    // the buffer management semantics. NextReadPage() may unpin or delete the 
buffer
-    // backing the rows returned from the *previous* call to GetNext().
-    RETURN_IF_ERROR(NextReadPage());
-  }
-
-  DCHECK(has_read_iterator());
-  DCHECK(read_page_ != pages_.end());
-  DCHECK(read_page_->is_pinned()) << DebugString();
-  DCHECK_GE(read_page_rows_returned_, 0);
-
-  int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_;
-  int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), 
rows_left_in_page);
-  DCHECK_GE(rows_to_fill, 1);
-  uint8_t* tuple_row_mem = 
reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows()));
-
-  // Produce tuple rows from the current page and the corresponding position 
on the
-  // null tuple indicator.
-  if (FILL_FLAT_ROWS) {
-    DCHECK(flat_rows != nullptr);
-    DCHECK(!delete_on_read_);
-    DCHECK_EQ(batch->num_rows(), 0);
-    flat_rows->clear();
-    flat_rows->reserve(rows_to_fill);
-  }
-
-  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
-  // Start reading from the current position in 'read_page_'.
-  for (int i = 0; i < rows_to_fill; ++i) {
-    if (FILL_FLAT_ROWS) {
-      flat_rows->push_back(read_ptr_);
-      DCHECK_EQ(flat_rows->size(), i + 1);
-    }
-    // Copy the row into the output batch.
-    TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem);
-    tuple_row_mem += sizeof(Tuple*) * tuples_per_row;
-    UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row);
-
-    // Update string slot ptrs, skipping external strings.
-    for (int j = 0; j < inlined_string_slots_.size(); ++j) {
-      Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first);
-      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-      FixUpStringsForRead(inlined_string_slots_[j].second, tuple);
-    }
-
-    // Update collection slot ptrs, skipping external collections. We traverse 
the
-    // collection structure in the same order as it was written to the stream, 
allowing
-    // us to infer the data layout based on the length of collections and 
strings.
-    for (int j = 0; j < inlined_coll_slots_.size(); ++j) {
-      Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first);
-      if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-      FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple);
-    }
-  }
-
-  batch->CommitRows(rows_to_fill);
-  rows_returned_ += rows_to_fill;
-  read_page_rows_returned_ += rows_to_fill;
-  *eos = (rows_returned_ == num_rows_);
-  if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || 
delete_on_read_)) {
-    // No more data in this page. The batch must be immediately returned up 
the operator
-    // tree and deep copied so that NextReadPage() can reuse the read page's 
buffer.
-    // TODO: IMPALA-4179 - instead attach the buffer and flush the resources.
-    batch->MarkNeedsDeepCopy();
-  }
-  if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
-  DCHECK_LE(read_ptr_, read_end_ptr_);
-  return Status::OK();
-}
-
-void BufferedTupleStreamV2::FixUpStringsForRead(
-    const vector<SlotDescriptor*>& string_slots, Tuple* tuple) {
-  DCHECK(tuple != nullptr);
-  for (const SlotDescriptor* slot_desc : string_slots) {
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-
-    StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_);
-    sv->ptr = reinterpret_cast<char*>(read_ptr_);
-    read_ptr_ += sv->len;
-  }
-}
-
-void BufferedTupleStreamV2::FixUpCollectionsForRead(
-    const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) {
-  DCHECK(tuple != nullptr);
-  for (const SlotDescriptor* slot_desc : collection_slots) {
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-
-    CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = 
*slot_desc->collection_item_descriptor();
-    int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-    DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_);
-    cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
-    read_ptr_ += coll_byte_size;
-
-    if (!item_desc.HasVarlenSlots()) continue;
-    uint8_t* coll_data = cv->ptr;
-    for (int i = 0; i < cv->num_tuples; ++i) {
-      Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-      FixUpStringsForRead(item_desc.string_slots(), item);
-      FixUpCollectionsForRead(item_desc.collection_slots(), item);
-      coll_data += item_desc.byte_size();
-    }
-  }
-}
-
-int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept {
-  int64_t size = 0;
-  if (has_nullable_tuple_) {
-    size += NullIndicatorBytesPerRow();
-    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
-      if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i];
-    }
-  } else {
-    for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) {
-      size += fixed_tuple_sizes_[i];
-    }
-  }
-  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
-    Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
-    if (tuple == nullptr) continue;
-    const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second;
-    for (auto it = slots.begin(); it != slots.end(); ++it) {
-      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
-      size += tuple->GetStringSlot((*it)->tuple_offset())->len;
-    }
-  }
-
-  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
-    Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
-    if (tuple == nullptr) continue;
-    const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second;
-    for (auto it = slots.begin(); it != slots.end(); ++it) {
-      if (tuple->IsNull((*it)->null_indicator_offset())) continue;
-      CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset());
-      const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor();
-      size += cv->num_tuples * item_desc.byte_size();
-
-      if (!item_desc.HasVarlenSlots()) continue;
-      for (int j = 0; j < cv->num_tuples; ++j) {
-        Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * 
item_desc.byte_size()]);
-        size += item->VarlenByteSize(item_desc);
-      }
-    }
-  }
-  return size;
-}
-
-bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept 
{
-  // Use AddRowCustom*() to do the work of advancing the page.
-  int64_t row_size = ComputeRowSize(row);
-  uint8_t* data = AddRowCustomBeginSlow(row_size, status);
-  if (data == nullptr) return false;
-  bool success = DeepCopy(row, &data, data + row_size);
-  DCHECK(success);
-  DCHECK_EQ(data, write_ptr_);
-  AddRowCustomEnd(row_size);
-  return true;
-}
-
-uint8_t* BufferedTupleStreamV2::AddRowCustomBeginSlow(
-    int64_t size, Status* status) noexcept {
-  bool got_reservation;
-  *status = AdvanceWritePage(size, &got_reservation);
-  if (!status->ok() || !got_reservation) return nullptr;
-
-  // We have a large-enough page so now success is guaranteed.
-  uint8_t* result = AddRowCustomBegin(size, status);
-  DCHECK(result != nullptr);
-  return result;
-}
-
-void BufferedTupleStreamV2::AddLargeRowCustomEnd(int64_t size) noexcept {
-  DCHECK_GT(size, default_page_len_);
-  // Immediately unpin the large write page so that we're not using up extra 
reservation
-  // and so we don't append another row to the page.
-  ResetWritePage();
-  // Save some of the reservation we freed up so we can create the next write 
page when
-  // needed.
-  if (NeedWriteReservation()) {
-    buffer_pool_client_->SaveReservation(&write_page_reservation_, 
default_page_len_);
-  }
-  // The stream should be in a consistent state once the row is added.
-  CHECK_CONSISTENCY_FAST();
-}
-
-bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept {
-  DCHECK(!closed_);
-  DCHECK(has_write_iterator());
-  if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, 
write_end_ptr_))) {
-    return AddRowSlow(row, status);
-  }
-  ++num_rows_;
-  ++write_page_->num_rows;
-  return true;
-}
-
-bool BufferedTupleStreamV2::DeepCopy(
-    TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept {
-  return has_nullable_tuple_ ? DeepCopyInternal<true>(row, data, data_end) :
-                               DeepCopyInternal<false>(row, data, data_end);
-}
-
-// TODO: consider codegening this.
-// TODO: in case of duplicate tuples, this can redundantly serialize data.
-template <bool HAS_NULLABLE_TUPLE>
-bool BufferedTupleStreamV2::DeepCopyInternal(
-    TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept {
-  uint8_t* pos = *data;
-  const uint64_t tuples_per_row = desc_->tuple_descriptors().size();
-  // Copy the not NULL fixed len tuples. For the NULL tuples just update the 
NULL tuple
-  // indicator.
-  if (HAS_NULLABLE_TUPLE) {
-    int null_indicator_bytes = NullIndicatorBytesPerRow();
-    if (UNLIKELY(pos + null_indicator_bytes > data_end)) return false;
-    uint8_t* null_indicators = pos;
-    pos += NullIndicatorBytesPerRow();
-    memset(null_indicators, 0, null_indicator_bytes);
-    for (int i = 0; i < tuples_per_row; ++i) {
-      uint8_t* null_word = null_indicators + (i >> 3);
-      const uint32_t null_pos = i & 7;
-      const int tuple_size = fixed_tuple_sizes_[i];
-      Tuple* t = row->GetTuple(i);
-      const uint8_t mask = 1 << (7 - null_pos);
-      if (t != nullptr) {
-        if (UNLIKELY(pos + tuple_size > data_end)) return false;
-        memcpy(pos, t, tuple_size);
-        pos += tuple_size;
-      } else {
-        *null_word |= mask;
-      }
-    }
-  } else {
-    // If we know that there are no nullable tuples no need to set the 
nullability flags.
-    for (int i = 0; i < tuples_per_row; ++i) {
-      const int tuple_size = fixed_tuple_sizes_[i];
-      if (UNLIKELY(pos + tuple_size > data_end)) return false;
-      Tuple* t = row->GetTuple(i);
-      // TODO: Once IMPALA-1306 (Avoid passing empty tuples of 
non-materialized slots)
-      // is delivered, the check below should become DCHECK(t != nullptr).
-      DCHECK(t != nullptr || tuple_size == 0);
-      memcpy(pos, t, tuple_size);
-      pos += tuple_size;
-    }
-  }
-
-  // Copy inlined string slots. Note: we do not need to convert the string 
ptrs to offsets
-  // on the write path, only on the read. The tuple data is immediately 
followed
-  // by the string data so only the len information is necessary.
-  for (int i = 0; i < inlined_string_slots_.size(); ++i) {
-    const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first);
-    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-    if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second, &pos, 
data_end)))
-      return false;
-  }
-
-  // Copy inlined collection slots. We copy collection data in a well-defined 
order so
-  // we do not need to convert pointers to offsets on the write path.
-  for (int i = 0; i < inlined_coll_slots_.size(); ++i) {
-    const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first);
-    if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue;
-    if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second, &pos, 
data_end)))
-      return false;
-  }
-  *data = pos;
-  return true;
-}
-
-bool BufferedTupleStreamV2::CopyStrings(const Tuple* tuple,
-    const vector<SlotDescriptor*>& string_slots, uint8_t** data, const 
uint8_t* data_end) {
-  for (const SlotDescriptor* slot_desc : string_slots) {
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-    const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    if (LIKELY(sv->len > 0)) {
-      if (UNLIKELY(*data + sv->len > data_end)) return false;
-
-      memcpy(*data, sv->ptr, sv->len);
-      *data += sv->len;
-    }
-  }
-  return true;
-}
-
-bool BufferedTupleStreamV2::CopyCollections(const Tuple* tuple,
-    const vector<SlotDescriptor*>& collection_slots, uint8_t** data, const 
uint8_t* data_end) {
-  for (const SlotDescriptor* slot_desc : collection_slots) {
-    if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
-    const CollectionValue* cv = 
tuple->GetCollectionSlot(slot_desc->tuple_offset());
-    const TupleDescriptor& item_desc = 
*slot_desc->collection_item_descriptor();
-    if (LIKELY(cv->num_tuples > 0)) {
-      int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-      if (UNLIKELY(*data + coll_byte_size > data_end)) return false;
-      uint8_t* coll_data = *data;
-      memcpy(coll_data, cv->ptr, coll_byte_size);
-      *data += coll_byte_size;
-
-      if (!item_desc.HasVarlenSlots()) continue;
-      // Copy variable length data when present in collection items.
-      for (int i = 0; i < cv->num_tuples; ++i) {
-        const Tuple* item = reinterpret_cast<Tuple*>(coll_data);
-        if (UNLIKELY(!CopyStrings(item, item_desc.string_slots(), data, 
data_end))) {
-          return false;
-        }
-        if (UNLIKELY(
-                !CopyCollections(item, item_desc.collection_slots(), data, 
data_end))) {
-          return false;
-        }
-        coll_data += item_desc.byte_size();
-      }
-    }
-  }
-  return true;
-}
-
-void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) 
const {
-  DCHECK(row != nullptr);
-  DCHECK(!closed_);
-  DCHECK(is_pinned());
-  DCHECK(!delete_on_read_);
-  uint8_t* data = flat_row;
-  return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) :
-                               UnflattenTupleRow<false>(&data, row);
-}
-
-template <bool HAS_NULLABLE_TUPLE>
-void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) 
const {
-  const int tuples_per_row = desc_->tuple_descriptors().size();
-  uint8_t* ptr = *data;
-  if (has_nullable_tuple_) {
-    // Stitch together the tuples from the page and the NULL ones.
-    const uint8_t* null_indicators = ptr;
-    ptr += NullIndicatorBytesPerRow();
-    for (int i = 0; i < tuples_per_row; ++i) {
-      const uint8_t* null_word = null_indicators + (i >> 3);
-      const uint32_t null_pos = i & 7;
-      const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0);
-      row->SetTuple(
-          i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * 
is_not_null));
-      ptr += fixed_tuple_sizes_[i] * is_not_null;
-    }
-  } else {
-    for (int i = 0; i < tuples_per_row; ++i) {
-      row->SetTuple(i, reinterpret_cast<Tuple*>(ptr));
-      ptr += fixed_tuple_sizes_[i];
-    }
-  }
-  *data = ptr;
-}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h 
b/be/src/runtime/buffered-tuple-stream-v2.h
deleted file mode 100644
index 2023124..0000000
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ /dev/null
@@ -1,705 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
-#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H
-
-#include <set>
-#include <vector>
-#include <boost/scoped_ptr.hpp>
-#include <boost/function.hpp>
-
-#include "common/global-types.h"
-#include "common/status.h"
-#include "gutil/macros.h"
-#include "runtime/bufferpool/buffer-pool.h"
-#include "runtime/row-batch.h"
-
-namespace impala {
-
-class MemTracker;
-class RuntimeState;
-class RowDescriptor;
-class SlotDescriptor;
-class Tuple;
-class TupleRow;
-
-/// Class that provides an abstraction for a stream of tuple rows backed by 
BufferPool
-/// Pages. Rows can be added to the stream and read back. Rows are returned in 
the order
-/// they are added.
-///
-/// The BufferedTupleStream is *not* thread safe from the caller's point of 
view.
-/// Different threads should not concurrently call methods of the same 
BufferedTupleStream
-/// object.
-///
-/// Reading and writing the stream:
-/// The stream supports two modes of reading/writing, depending on whether
-/// PrepareForWrite() is called to initialize a write iterator only or
-/// PrepareForReadWrite() is called to initialize both read and write 
iterators to enable
-/// interleaved reads and writes.
-///
-/// To use write-only mode, PrepareForWrite() is called once and 
AddRow()/AddRowCustom*()
-/// are called repeatedly to initialize then advance a write iterator through 
the stream.
-/// Once the stream is fully written, it can be read back by calling 
PrepareForRead()
-/// then GetNext() repeatedly to advance a read iterator through the stream, 
or by
-/// calling GetRows() to get all of the rows at once.
-///
-/// To use read/write mode, PrepareForReadWrite() is called once to initialize 
the read
-/// and write iterators. AddRow()/AddRowCustom*() then advance a write 
iterator through
-/// the stream, and GetNext() advances a trailing read iterator through the 
stream.
-///
-/// Buffer management:
-/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple 
stream uses
-/// the client's reservation to pin pages in memory. It will automatically try 
to
-/// increase the client's reservation whenever it needs to do so to make 
progress.
-///
-/// Normally pages are all of the same default page length, but larger pages 
up to the
-/// max page length are used if needed to store rows that are too large for a
-/// default-length page.
-///
-/// The stream has both pinned and unpinned modes. In the pinned mode all 
pages are
-/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned 
in memory
-/// and allows clients to save pointers to rows in the stream and randomly 
access them.
-/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned 
mode, only
-/// pages currently being read and written are pinned and other pages are 
unpinned and
-/// therefore do not use the client's reservation and can be spilled to disk. 
The stream
-/// always holds onto a default page's worth of reservation for the read and 
write
-/// iterators (i.e. two page's worth if the stream is in read/write mode), 
even if that
-/// many pages are not currently pinned. This means that UnpinStream() always 
succeeds,
-/// and moving to the next default-length write page or read page on an 
unpinned stream
-/// does not require additional reservation. This is implemented by saving 
reservations
-/// in SubReservations.
-///
-/// To read or write a row larger than the default page size to/from an 
unpinned stream,
-/// the client must have max_page_len - default_page_len unused reservation. 
Writing a
-/// large row to an unpinned stream only uses the reservation for the duration 
of the
-/// AddRow()/AddRowCustom*() call. Reading a large row from an unpinned stream 
uses the
-/// reservation until the next call to GetNext(). E.g. to partition a single 
unpinned
-/// stream into n unpinned streams, the reservation needed is (n - 1) *
-/// default_page_len + 2 * max_page_len: one large read buffer and one large 
write
-/// buffer is needed to keep the row being processed in-memory, but only 
default-sized
-/// buffers are needed for the other streams being written.
-///
-/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing 
a flag
-/// to PrepareForRead() which deletes the stream's pages as it does a final 
read
-/// pass over the stream.
-///
-/// TODO: IMPALA-4179: the buffer management can be simplified once we can 
attach
-/// buffers to RowBatches.
-///
-/// Page layout:
-/// Rows are stored back to back starting at the first byte of each page's 
buffer, with
-/// no interleaving of data from different rows. There is no padding or 
alignment
-/// between rows. Rows larger than the default page length are stored on their 
own
-/// page.
-///
-/// Tuple row layout:
-/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), 
there is a
-/// bitstring at the start of each row with null indicators for all tuples in 
each row
-/// (including non-nullable tuples). The bitstring occupies 
ceil(num_tuples_per_row / 8)
-/// bytes. A 1 indicates the tuple is null.
-///
-/// The fixed length parts of the row's tuples are stored first, followed by 
var len data
-/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var 
len slots can
-/// point to var len data outside the stream. When reading the stream, the 
length of each
-/// row's var len data in the stream must be computed to find the next row's 
start.
-///
-/// The tuple stream supports reading from the stream into RowBatches without 
copying
-/// out any data: the RowBatches' Tuple pointers will point directly into the 
stream's
-/// pages' buffers. The fixed length parts follow Impala's internal tuple 
format, so for
-/// the tuple to be valid, we only need to update pointers to point to the var 
len data
-/// in the stream. These pointers need to be updated by the stream because a 
spilled
-/// page's data may be relocated to a different buffer. The pointers are 
updated lazily
-/// upon reading the stream via GetNext() or GetRows().
-///
-/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, 
"world"))
-/// with all var len data stored in the stream:
-///  <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ...
-/// +--------+-----------+-----------+-----------+-------------+
-/// | IntVal | StringVal | BigIntVal | StringVal |             | ...
-/// +--------+-----------+-----------+-----------++------------+
-/// | val: 1 | len: 5    | val: 2    | len: 5    | helloworld  | ...
-/// |        | ptr: 0x.. |           | ptr: 0x.. |             | ...
-/// +--------+-----------+-----------+-----------+-------------+
-///  <--4b--> <---12b---> <----8b---> <---12b---> <----10b---->
-///
-/// Example layout for a row with the second tuple nullable ((1, "hello"), 
NULL)
-/// with all var len data stored in the stream:
-/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row 
...
-/// +-------------------------+--------+-----------+------------+
-/// |                         | IntVal | StringVal |            | ...
-/// +-------------------------+--------+-----------+------------+
-/// | 0000 0010               | val: 1 | len: 5    | hello      | ...
-/// |                         |        | ptr: 0x.. |            | ...
-/// +-------------------------+--------+-----------+------------+
-///  <---------1b------------> <--4b--> <---12b---> <----5b---->
-///
-/// Example layout for a row with a single non-nullable tuple (("hello", 
"world")) with
-/// the second string slot stored externally to the stream:
-///  <------ tuple 1 ------> <- var len ->  <- next row ...
-/// +-----------+-----------+-------------+
-/// | StringVal | StringVal |             | ...
-/// +-----------+-----------+-------------+
-/// | len: 5    | len: 5    |  hello      | ...
-/// | ptr: 0x.. | ptr: 0x.. |             | ...
-/// +-----------+-----------+-------------+
-///  <---12b---> <---12b---> <-----5b---->
-///
-/// The behavior of reads and writes is as follows:
-/// Read:
-///   1. Unpinned: Only a single read page is pinned at a time. This means 
that only
-///     enough reservation to pin a single page is needed to read the stream, 
regardless
-///     of the stream's size. Each page is deleted or unpinned (if delete on 
read is true
-///     or false respectively) before advancing to the next page.
-///   2. Pinned: All pages in the stream are pinned so do not need to be 
pinned or
-///     unpinned when reading from the stream. If delete on read is true, 
pages are
-///     deleted after being read. If the stream was previously unpinned, the 
page's data
-///     may not yet be in memory - reading from the stream can block on I/O or 
fail with
-///     an I/O error.
-/// Write:
-///   1. Unpinned: Unpin pages as they fill up. This means that only a enough 
reservation
-///     to pin a single write page is required to write to the stream, 
regardless of the
-///     stream's size.
-///   2. Pinned: Pages are left pinned. If the next page in the stream cannot 
be pinned
-///     because the client's reservation is insufficient (and could not be 
increased by
-///     the stream), the read call will fail and the client can either unpin 
the stream
-///     or free up other memory before retrying.
-///
-/// Memory lifetime of rows read from stream:
-/// If the stream is pinned and delete on read is false, it is valid to access 
any tuples
-/// returned via GetNext() or GetRows() until the stream is unpinned. If the 
stream is
-/// unpinned or delete on read is true, then the batch returned from GetNext() 
may have
-/// the needs_deep_copy flag set, which means that any tuple memory returned 
so far from
-/// the stream may be freed on the next call to GetNext().
-/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers 
to the batch.
-///
-/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd():
-/// The BufferedTupleStream supports allocation of uninitialized rows with
-/// AddRowCustom*(). AddRowCustomBegin() is called instead of AddRow() if the 
client wants
-/// to manually construct a row. The caller of AddRowCustomBegin() is 
responsible for
-/// writing the row with exactly the layout described above then calling
-/// AddRowCustomEnd() when done.
-///
-/// If a caller constructs a tuple in this way, the caller can set the 
pointers and they
-/// will not be modified until the stream is read via GetNext() or GetRows().
-/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow().
-///
-/// TODO: we need to be able to do read ahead for pages. We need some way to 
indicate a
-/// page will need to be pinned soon.
-class BufferedTupleStreamV2 {
- public:
-  /// A pointer to the start of a flattened TupleRow in the stream.
-  typedef uint8_t* FlatRowPtr;
-
-  /// row_desc: description of rows stored in the stream. This is the desc for 
rows
-  /// that are added and the rows being returned.
-  /// page_len: the size of pages to use in the stream
-  /// ext_varlen_slots: set of varlen slots with data stored externally to the 
stream
-  BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor* row_desc,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len,
-      int64_t max_page_len,
-      const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>());
-
-  virtual ~BufferedTupleStreamV2();
-
-  /// Initializes the tuple stream object on behalf of node 'node_id'. Must be 
called
-  /// once before any of the other APIs.
-  /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is 
unpinned.
-  /// 'node_id' is only used for error reporting.
-  Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT;
-
-  /// Prepares the stream for writing by saving enough reservation for a 
default-size
-  /// write page. Tries to increase reservation if there is not enough unused 
reservation
-  /// for a page. Called after Init() and before the first AddRow() or
-  /// AddRowCustomBegin() call.
-  /// 'got_reservation': set to true if there was enough reservation to 
initialize the
-  ///     first write page and false if there was not enough reservation and 
no other
-  ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT;
-
-  /// Prepares the stream for interleaved reads and writes by saving enough 
reservation
-  /// for default-sized read and write pages. Called after Init() and before 
the first
-  /// AddRow() or AddRowCustomBegin() call.
-  /// 'delete_on_read': Pages are deleted after they are read.
-  /// 'got_reservation': set to true if there was enough reservation to 
initialize the
-  ///     read and write pages and false if there was not enough reservation 
and no other
-  ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForReadWrite(
-      bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT;
-
-  /// Prepares the stream for reading, invalidating the write iterator (if 
there is one).
-  /// Therefore must be called after the last AddRow() or AddRowCustomEnd() 
and before
-  /// GetNext(). PrepareForRead() can be called multiple times to do multiple 
read passes
-  /// over the stream, unless rows were read from the stream after 
PrepareForRead() or
-  /// PrepareForReadWrite() was called with delete_on_read = true.
-  /// 'delete_on_read': Pages are deleted after they are read.
-  /// 'got_reservation': set to true if there was enough reservation to 
initialize the
-  ///     first read page and false if there was not enough reservation and no 
other
-  ///     error was encountered. Undefined if an error status is returned.
-  Status PrepareForRead(bool delete_on_read, bool* got_reservation) 
WARN_UNUSED_RESULT;
-
-  /// Adds a single row to the stream. There are three possible outcomes:
-  /// a) The append succeeds. True is returned.
-  /// b) The append fails because the unused reservation was not sufficient to 
add
-  ///   a new page to the stream large enough to fit 'row' and the stream 
could not
-  ///   increase the reservation to get enough unused reservation. Returns 
false and
-  ///   sets 'status' to OK. The append can be retried after freeing up memory 
or
-  ///   unpinning the stream.
-  /// c) The append fails with a runtime error. Returns false and sets 
'status' to an
-  ///   error.
-  /// d) The append fails becase the row is too large to fit in a page of a 
stream.
-  ///   Returns false and sets 'status' to an error.
-  ///
-  /// Unpinned streams can only encounter case b) when appending a row larger 
than
-  /// the default page size and the reservation could not be increased 
sufficiently.
-  /// Otherwise enough memory is automatically freed up by unpinning the 
current write
-  /// page.
-  ///
-  /// BufferedTupleStream will do a deep copy of the memory in the row. After 
AddRow()
-  /// returns an error, it should not be called again.
-  bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT;
-
-  /// Allocates space to store a row of 'size' bytes (including fixed and 
variable length
-  /// data). If successful, returns a pointer to the allocated row. The caller 
then must
-  /// writes valid data to the row and call AddRowCustomEnd().
-  ///
-  /// If unsuccessful, returns nullptr. The failure modes are the same as 
described in the
-  /// AddRow() comment.
-  ALWAYS_INLINE uint8_t* AddRowCustomBegin(int64_t size, Status* status);
-
-  /// Called after AddRowCustomBegin() when done writing the row. Only should 
be called
-  /// if AddRowCustomBegin() succeeded. See the AddRowCustomBegin() comment for
-  /// explanation.
-  /// 'size': the size passed into AddRowCustomBegin().
-  void AddRowCustomEnd(int64_t size);
-
-  /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call 
if the
-  /// stream is pinned. The row must have been allocated with the stream's row 
desc.
-  /// The returned 'row' is backed by memory from the stream so is only valid 
as long
-  /// as the stream is pinned.
-  void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const;
-
-  /// Pins all pages in this stream and switches to pinned mode. Has no effect 
if the
-  /// stream is already pinned.
-  /// If the current unused reservation is not sufficient to pin the stream in 
memory,
-  /// this will try to increase the reservation. If that fails, 'pinned' is 
set to false
-  /// and the stream is left unpinned. Otherwise 'pinned' is set to true.
-  Status PinStream(bool* pinned) WARN_UNUSED_RESULT;
-
-  /// Modes for UnpinStream().
-  enum UnpinMode {
-    /// All pages in the stream are unpinned and the read/write positions in 
the stream
-    /// are reset. No more rows can be written to the stream after this. The 
stream can
-    /// be re-read from the beginning by calling PrepareForRead().
-    UNPIN_ALL,
-    /// All pages are unpinned aside from the current read and write pages (if 
any),
-    /// which is left in the same state. The unpinned stream can continue 
being read
-    /// or written from the current read or write positions.
-    UNPIN_ALL_EXCEPT_CURRENT,
-  };
-
-  /// Unpins stream with the given 'mode' as described above.
-  void UnpinStream(UnpinMode mode);
-
-  /// Get the next batch of output rows, which are backed by the stream's 
memory.
-  /// If the stream is unpinned or 'delete_on_read' is true, the 
'needs_deep_copy'
-  /// flag may be set on 'batch' to signal that memory will be freed on the 
next
-  /// call to GetNext() and that the caller should copy out any data it needs 
from
-  /// rows in 'batch' or in previous batches returned from GetNext().
-  ///
-  /// If the stream is pinned and 'delete_on_read' is false, the memory 
backing the
-  /// rows will remain valid until the stream is unpinned, destroyed, etc.
-  /// TODO: IMPALA-4179: update when we simplify the memory transfer model.
-  Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT;
-
-  /// Same as above, but populate 'flat_rows' with a pointer to the flat 
version of
-  /// each returned row in the pinned stream. The pointers in 'flat_rows' are 
only
-  /// valid as long as the stream remains pinned.
-  Status GetNext(
-      RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) 
WARN_UNUSED_RESULT;
-
-  /// Returns all the rows in the stream in batch. This pins the entire stream 
in the
-  /// process. If the current unused reservation is not sufficient to pin the 
stream in
-  /// memory, this will try to increase the reservation. If that fails, 
'got_rows' is set
-  /// to false.
-  Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch,
-      bool* got_rows) WARN_UNUSED_RESULT;
-
-  /// Must be called once at the end to cleanup all resources. If 'batch' is 
non-NULL,
-  /// attaches buffers from pinned pages that rows returned from GetNext() may 
reference.
-  /// Otherwise deletes all pages. Does nothing if the stream was already 
closed. The
-  /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching 
buffers.
-  void Close(RowBatch* batch, RowBatch::FlushMode flush);
-
-  /// Number of rows in the stream.
-  int64_t num_rows() const { return num_rows_; }
-
-  /// Number of rows returned via GetNext().
-  int64_t rows_returned() const { return rows_returned_; }
-
-  /// Returns the byte size necessary to store the entire stream in memory.
-  int64_t byte_size() const { return total_byte_size_; }
-
-  /// Returns the number of bytes currently pinned in memory by the stream.
-  /// If ignore_current is true, the write_page_ memory is not included.
-  int64_t BytesPinned(bool ignore_current) const {
-    if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) {
-      return bytes_pinned_ - write_page_->len();
-    }
-    return bytes_pinned_;
-  }
-
-  bool is_closed() const { return closed_; }
-  bool is_pinned() const { return pinned_; }
-  bool has_read_iterator() const { return has_read_iterator_; }
-  bool has_write_iterator() const { return has_write_iterator_; }
-
-  std::string DebugString() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(BufferedTupleStreamV2);
-  friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test;
-  friend class ArrayTupleStreamTest_TestComputeRowSize_Test;
-  friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test;
-  friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test;
-
-  /// Wrapper around BufferPool::PageHandle that tracks additional info about 
the page.
-  struct Page {
-    Page() : num_rows(0), retrieved_buffer(true) {}
-
-    inline int len() const { return handle.len(); }
-    inline bool is_pinned() const { return handle.is_pinned(); }
-    inline int pin_count() const { return handle.pin_count(); }
-    Status GetBuffer(const BufferPool::BufferHandle** buffer) {
-      RETURN_IF_ERROR(handle.GetBuffer(buffer));
-      retrieved_buffer = true;
-      return Status::OK();
-    }
-    std::string DebugString() const;
-
-    BufferPool::PageHandle handle;
-
-    /// Number of rows written to the page.
-    int num_rows;
-
-    /// Whether we called GetBuffer() on the page since it was last pinned. 
This means
-    /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() 
may have
-    /// returned rows referencing the page's buffer.
-    bool retrieved_buffer;
-  };
-
-  /// Runtime state instance used to check for cancellation. Not owned.
-  RuntimeState* const state_;
-
-  /// Description of rows stored in the stream.
-  const RowDescriptor* desc_;
-
-  /// Plan node ID, used for error reporting.
-  int node_id_;
-
-  /// The size of the fixed length portion for each tuple in the row.
-  std::vector<int> fixed_tuple_sizes_;
-
-  /// Vectors of all the strings slots that have their varlen data stored in 
stream
-  /// grouped by tuple_idx.
-  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> 
inlined_string_slots_;
-
-  /// Vectors of all the collection slots that have their varlen data stored 
in the
-  /// stream, grouped by tuple_idx.
-  std::vector<std::pair<int, std::vector<SlotDescriptor*>>> 
inlined_coll_slots_;
-
-  /// Buffer pool and client used to allocate, pin and release pages. Not 
owned.
-  BufferPool* buffer_pool_;
-  BufferPool::ClientHandle* buffer_pool_client_;
-
-  /// List of pages in the stream.
-  /// Empty iff one of two cases applies:
-  /// * before the first row has been added with AddRow() or AddRowCustom().
-  /// * after the stream has been destructively read in 'delete_on_read' mode
-  std::list<Page> pages_;
-  // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the 
number of pages.
-  // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC 
bug #49561.
-  int64_t num_pages_;
-
-  /// Total size of pages_, including any pages already deleted in 
'delete_on_read'
-  /// mode.
-  int64_t total_byte_size_;
-
-  /// True if there is currently an active read iterator for the stream.
-  bool has_read_iterator_;
-
-  /// The current page being read. When no read iterator is active, equal to 
list.end().
-  /// When a read iterator is active, either points to the current read page, 
or equals
-  /// list.end() if no rows have yet been read.  GetNext() does not advance 
this past
-  /// the end of the stream, so upon eos 'read_page_' points to the last page 
and
-  /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed 
and an error
-  /// status was returned.
-  std::list<Page>::iterator read_page_;
-
-  /// Saved reservation for read iterator. 'default_page_len_' reservation is 
saved if
-  /// there is a read iterator, no pinned read page, and the possibility that 
the read
-  /// iterator will advance to a valid page.
-  BufferPool::SubReservation read_page_reservation_;
-
-  /// Number of rows returned from the current read_page_.
-  uint32_t read_page_rows_returned_;
-
-  /// Pointer into read_page_ to the byte after the last row read.
-  uint8_t* read_ptr_;
-
-  /// Pointer to one byte past the end of read_page_. Used to detect overruns.
-  const uint8_t* read_end_ptr_;
-
-  /// Pointer into write_page_ to the byte after the last row written.
-  uint8_t* write_ptr_;
-
-  /// Pointer to one byte past the end of write_page_. Cached to speed up 
computation
-  const uint8_t* write_end_ptr_;
-
-  /// Number of rows returned to the caller from GetNext() since the last
-  /// PrepareForRead() call.
-  int64_t rows_returned_;
-
-  /// True if there is currently an active write iterator into the stream.
-  bool has_write_iterator_;
-
-  /// The current page for writing. NULL if there is no write iterator or no 
current
-  /// write page. Always pinned. Size is 'default_page_len_', except 
temporarily while
-  /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd().
-  Page* write_page_;
-
-  /// Saved reservation for write iterator. 'default_page_len_' reservation is 
saved if
-  /// there is a write iterator, no page currently pinned for writing and the 
possibility
-  /// that a pin count will be needed for the write iterator in future. 
Specifically if:
-  /// * no rows have been appended to the stream and 'pages_' is empty, or
-  /// * the stream is unpinned, 'write_page_' is null and and the last page in 
'pages_'
-  ///   is a large page that we advanced past, or
-  /// * there is only one pinned page in the stream and it is already pinned 
for reading.
-  BufferPool::SubReservation write_page_reservation_;
-
-  /// Total bytes of pinned pages in pages_, stored to avoid iterating over 
the list
-  /// to compute it.
-  int64_t bytes_pinned_;
-
-  /// Number of rows stored in the stream. Includes rows that were already 
deleted during
-  /// a destructive 'delete_on_read' pass over the stream.
-  int64_t num_rows_;
-
-  /// The default length in bytes of pages used to store the stream's rows. 
All rows that
-  /// fit in a default-sized page are stored in default-sized page.
-  const int64_t default_page_len_;
-
-  /// The maximum length in bytes of pages used to store the stream's rows. 
This is a
-  /// hard limit on the maximum size of row that can be stored in the stream 
and the
-  /// amount of reservation required to read or write to an unpinned stream.
-  const int64_t max_page_len_;
-
-  /// Whether any tuple in the rows is nullable.
-  const bool has_nullable_tuple_;
-
-  /// If true, pages are deleted after they are read during this read pass. 
Once rows
-  /// have been read from a stream with 'delete_on_read_' true, this is always 
true.
-  bool delete_on_read_;
-
-  bool closed_; // Used for debugging.
-
-  /// If true, this stream has been explicitly pinned by the caller and all 
pages are
-  /// kept pinned until the caller calls UnpinStream().
-  bool pinned_;
-
-  bool is_read_page(const Page* page) const {
-    return read_page_ != pages_.end() && &*read_page_ == page;
-  }
-
-  bool is_write_page(const Page* page) const { return write_page_ == page; }
-
-  /// Return true if the read and write page are the same.
-  bool has_read_write_page() const {
-    return write_page_ != nullptr && is_read_page(write_page_);
-  }
-
-  /// The slow path for AddRow() that is called if there is not sufficient 
space in
-  /// the current page.
-  bool AddRowSlow(TupleRow* row, Status* status) noexcept;
-
-  /// The slow path for AddRowCustomBegin() that is called if there is not 
sufficient space in
-  /// the current page.
-  uint8_t* AddRowCustomBeginSlow(int64_t size, Status* status) noexcept;
-
-  /// The slow path for AddRowCustomEnd() that is called for large pages.
-  void AddLargeRowCustomEnd(int64_t size) noexcept;
-
-  /// Copies 'row' into the buffer starting at *data and ending at the byte 
before
-  /// 'data_end'. On success, returns true and updates *data to point after 
the last
-  /// byte written. Returns false if there is not enough space in the buffer 
provided.
-  bool DeepCopy(TupleRow* row, uint8_t** data, const uint8_t* data_end) 
noexcept;
-
-  /// Templated implementation of DeepCopy().
-  template <bool HAS_NULLABLE_TUPLE>
-  bool DeepCopyInternal(TupleRow* row, uint8_t** data, const uint8_t* 
data_end) noexcept;
-
-  /// Helper function to copy strings in string_slots from tuple into *data.
-  /// Updates *data to the end of the string data added. Returns false if the 
data
-  /// does not fit in the buffer [*data, data_end).
-  static bool CopyStrings(const Tuple* tuple,
-      const std::vector<SlotDescriptor*>& string_slots, uint8_t** data,
-      const uint8_t* data_end);
-
-  /// Helper function to deep copy collections in collection_slots from tuple 
into
-  /// the buffer [*data, data_end). Updates *data to the end of the collection 
data
-  /// added. Returns false if the data does not fit in the buffer.
-  static bool CopyCollections(const Tuple* tuple,
-      const std::vector<SlotDescriptor*>& collection_slots, uint8_t** data,
-      const uint8_t* data_end);
-
-  /// Gets a new page of 'page_len' bytes from buffer_pool_, updating 
write_page_,
-  /// write_ptr_ and write_end_ptr_. The caller must ensure there is 
'page_len' unused
-  /// reservation. The caller must reset the write page (if there is one) 
before calling.
-  Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT;
-
-  /// Determines what page size is needed to fit a row of 'row_size' bytes.
-  /// Returns an error if the row cannot fit in a page.
-  Status CalcPageLenForRow(int64_t row_size, int64_t* page_len);
-
-  /// Wrapper around NewWritePage() that allocates a new write page that fits 
a row of
-  /// 'row_size' bytes. Increases reservation if needed to allocate the next 
page.
-  /// Returns OK and sets 'got_reservation' to true if the write page was 
successfully
-  /// allocated. Returns an error if the row cannot fit in a page. Returns OK 
and sets
-  /// 'got_reservation' to false if the reservation could not be increased and 
no other
-  /// error was encountered.
-  Status AdvanceWritePage(
-      int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT;
-
-  /// Reset the write page, if there is one, and unpin pages accordingly. If 
there
-  /// is an active write iterator, the next row will be appended to a new page.
-  void ResetWritePage();
-
-  /// Invalidate the write iterator and release any resources associated with 
it. After
-  /// calling this, no more rows can be appended to the stream.
-  void InvalidateWriteIterator();
-
-  /// Same as PrepareForRead(), except the iterators are not invalidated and
-  /// the caller is assumed to have checked there is sufficient unused 
reservation.
-  Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT;
-
-  /// Pins the next read page. This blocks reading from disk if necessary to 
bring the
-  /// page's data into memory. Updates read_page_, read_ptr_, and
-  /// read_page_rows_returned_.
-  Status NextReadPage() WARN_UNUSED_RESULT;
-
-  /// Invalidate the read iterator, and release any resources associated with 
the active
-  /// iterator.
-  void InvalidateReadIterator();
-
-  /// Returns the total additional bytes that this row will consume in 
write_page_ if
-  /// appended to the page. This includes the row's null indicators, the fixed 
length
-  /// part of the row and the data for inlined_string_slots_ and 
inlined_coll_slots_.
-  int64_t ComputeRowSize(TupleRow* row) const noexcept;
-
-  /// Pins page and updates tracking stats.
-  Status PinPage(Page* page) WARN_UNUSED_RESULT;
-
-  /// Increment the page's pin count if this page needs a higher pin count 
given the
-  /// current read and write iterator positions and whether the stream will be 
pinned
-  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count 
needs to
-  /// be incremented multiple times. The caller is responsible for ensuring 
sufficient
-  /// reservation is available.
-  Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT;
-
-  /// Decrement the page's pin count if this page needs a lower pin count 
given the
-  /// current read and write iterator positions and whether the stream will be 
pinned
-  /// ('stream_pinned'). Assumes that no scenarios occur when the pin count 
needs to
-  /// be decremented multiple times.
-  void UnpinPageIfNeeded(Page* page, bool stream_pinned);
-
-  /// Return the expected pin count for 'page' in the current stream based on 
the current
-  /// read and write pages and whether the stream is pinned.
-  int ExpectedPinCount(bool stream_pinned, const Page* page) const;
-
-  /// Return true if the stream in its current state needs to have a 
reservation for
-  /// a write page stored in 'write_page_reservation_'.
-  bool NeedWriteReservation() const;
-
-  /// Same as above, except assume the stream's 'pinned_' state is 
'stream_pinned'.
-  bool NeedWriteReservation(bool stream_pinned) const;
-
-  /// Same as above, except assume the stream has 'num_pages' pages and 
different
-  /// iterator state.
-  static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages,
-      bool has_write_iterator, bool has_write_page, bool has_read_write_page);
-
-  /// Return true if the stream in its current state needs to have a 
reservation for
-  /// a read page stored in 'read_page_reservation_'.
-  bool NeedReadReservation() const;
-
-  /// Same as above, except assume the stream's 'pinned_' state is 
'stream_pinned'.
-  bool NeedReadReservation(bool stream_pinned) const;
-
-  /// Same as above, except assume the stream has 'num_pages' pages and a 
different
-  /// read iterator state.
-  bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool 
has_read_iterator,
-      bool has_read_page) const;
-
-  /// Same as above, except assume the stream has 'num_pages' pages and a 
different
-  /// write iterator state.
-  static bool NeedReadReservation(bool stream_pinned, int64_t num_pages,
-      bool has_read_iterator, bool has_read_page, bool has_write_iterator,
-      bool has_write_page);
-
-  /// Templated GetNext implementations.
-  template <bool FILL_FLAT_ROWS>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* 
flat_rows);
-  template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE>
-  Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* 
flat_rows);
-
-  /// Helper function to convert a flattened TupleRow stored starting at 
'*data' into
-  /// 'row'. *data is updated to point to the first byte past the end of the 
row.
-  template <bool HAS_NULLABLE_TUPLE>
-  void UnflattenTupleRow(uint8_t** data, TupleRow* row) const;
-
-  /// Helper function for GetNextInternal(). For each string slot in 
string_slots,
-  /// update StringValue's ptr field to point to the corresponding string data 
stored
-  /// inline in the stream (at the current value of read_ptr_) advance 
read_ptr_ by the
-  /// StringValue's length field.
-  void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* 
tuple);
-
-  /// Helper function for GetNextInternal(). For each collection slot in 
collection_slots,
-  /// recursively update any pointers in the CollectionValue to point to the 
corresponding
-  /// var len data stored inline in the stream, advancing read_ptr_ as data is 
read.
-  /// Assumes that the collection was serialized to the stream in DeepCopy()'s 
format.
-  void FixUpCollectionsForRead(
-      const vector<SlotDescriptor*>& collection_slots, Tuple* tuple);
-
-  /// Returns the number of null indicator bytes per row. Only valid if this 
stream has
-  /// nullable tuples.
-  int NullIndicatorBytesPerRow() const;
-
-  /// Returns the total bytes pinned. Only called in DCHECKs to validate 
bytes_pinned_.
-  int64_t CalcBytesPinned() const;
-
-  /// DCHECKs if the stream is internally inconsistent. The stream should 
always be in
-  /// a consistent state after returning success from a public API call. The 
Fast version
-  /// has constant runtime and does not check all of 'pages_'. The Full 
version includes
-  /// O(n) checks that require iterating over the whole 'pages_' list (e.g. 
checking that
-  /// each page is in a valid state).
-  void CheckConsistencyFast() const;
-  void CheckConsistencyFull() const;
-  void CheckPageConsistency(const Page* page) const;
-};
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream-v2.inline.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h 
b/be/src/runtime/buffered-tuple-stream-v2.inline.h
deleted file mode 100644
index 7022249..0000000
--- a/be/src/runtime/buffered-tuple-stream-v2.inline.h
+++ /dev/null
@@ -1,56 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
-#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H
-
-#include "runtime/buffered-tuple-stream-v2.h"
-
-#include "runtime/descriptors.h"
-#include "runtime/tuple-row.h"
-#include "util/bit-util.h"
-
-namespace impala {
-
-inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const {
-  DCHECK(has_nullable_tuple_);
-  return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size());
-}
-
-inline uint8_t* BufferedTupleStreamV2::AddRowCustomBegin(int64_t size, Status* 
status) {
-  DCHECK(!closed_);
-  DCHECK(has_write_iterator());
-  if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) {
-    return AddRowCustomBeginSlow(size, status);
-  }
-  DCHECK(write_page_ != nullptr);
-  DCHECK(write_page_->is_pinned());
-  DCHECK_LE(write_ptr_ + size, write_end_ptr_);
-  ++num_rows_;
-  ++write_page_->num_rows;
-
-  uint8_t* data = write_ptr_;
-  write_ptr_ += size;
-  return data;
-}
-
-inline void BufferedTupleStreamV2::AddRowCustomEnd(int64_t size) {
-  if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size);
-}
-}
-
-#endif

Reply via email to