http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc new file mode 100644 index 0000000..38dc44c --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -0,0 +1,1084 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/buffered-tuple-stream.inline.h" + +#include <boost/bind.hpp> +#include <gutil/strings/substitute.h> + +#include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/collection-value.h" +#include "runtime/descriptors.h" +#include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/string-value.h" +#include "runtime/tuple-row.h" +#include "util/bit-util.h" +#include "util/debug-util.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +#ifdef NDEBUG +#define CHECK_CONSISTENCY_FAST() +#define CHECK_CONSISTENCY_FULL() +#else +#define CHECK_CONSISTENCY_FAST() CheckConsistencyFast() +#define CHECK_CONSISTENCY_FULL() CheckConsistencyFull() +#endif + +using namespace impala; +using namespace strings; + +using BufferHandle = BufferPool::BufferHandle; + +BufferedTupleStream::BufferedTupleStream(RuntimeState* state, + const RowDescriptor* row_desc, BufferPool::ClientHandle* buffer_pool_client, + int64_t default_page_len, int64_t max_page_len, const set<SlotId>& ext_varlen_slots) + : state_(state), + desc_(row_desc), + node_id_(-1), + buffer_pool_(state->exec_env()->buffer_pool()), + buffer_pool_client_(buffer_pool_client), + num_pages_(0), + total_byte_size_(0), + has_read_iterator_(false), + read_page_reservation_(buffer_pool_client_), + read_page_rows_returned_(-1), + read_ptr_(nullptr), + read_end_ptr_(nullptr), + write_ptr_(nullptr), + write_end_ptr_(nullptr), + rows_returned_(0), + has_write_iterator_(false), + write_page_(nullptr), + write_page_reservation_(buffer_pool_client_), + bytes_pinned_(0), + num_rows_(0), + default_page_len_(default_page_len), + max_page_len_(max_page_len), + has_nullable_tuple_(row_desc->IsAnyTupleNullable()), + delete_on_read_(false), + closed_(false), + pinned_(true) { + DCHECK_GE(max_page_len, default_page_len); + DCHECK(BitUtil::IsPowerOf2(default_page_len)) << default_page_len; + DCHECK(BitUtil::IsPowerOf2(max_page_len)) << max_page_len; + read_page_ = pages_.end(); + for (int i = 0; i < desc_->tuple_descriptors().size(); ++i) { + const TupleDescriptor* tuple_desc = desc_->tuple_descriptors()[i]; + const int tuple_byte_size = tuple_desc->byte_size(); + fixed_tuple_sizes_.push_back(tuple_byte_size); + + vector<SlotDescriptor*> tuple_string_slots; + vector<SlotDescriptor*> tuple_coll_slots; + for (int j = 0; j < tuple_desc->slots().size(); ++j) { + SlotDescriptor* slot = tuple_desc->slots()[j]; + if (!slot->type().IsVarLenType()) continue; + if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) { + if (slot->type().IsVarLenStringType()) { + tuple_string_slots.push_back(slot); + } else { + DCHECK(slot->type().IsCollectionType()); + tuple_coll_slots.push_back(slot); + } + } + } + if (!tuple_string_slots.empty()) { + inlined_string_slots_.push_back(make_pair(i, tuple_string_slots)); + } + + if (!tuple_coll_slots.empty()) { + inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots)); + } + } +} + +BufferedTupleStream::~BufferedTupleStream() { + DCHECK(closed_); +} + +void BufferedTupleStream::CheckConsistencyFull() const { + CheckConsistencyFast(); + // The below checks require iterating over all the pages in the stream. + DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString(); + DCHECK_EQ(pages_.size(), num_pages_) << DebugString(); + for (const Page& page : pages_) CheckPageConsistency(&page); +} + +void BufferedTupleStream::CheckConsistencyFast() const { + // All the below checks should be O(1). + DCHECK(has_write_iterator() || write_page_ == nullptr); + if (write_page_ != nullptr) { + CheckPageConsistency(write_page_); + DCHECK(write_page_->is_pinned()); + DCHECK(write_page_->retrieved_buffer); + const BufferHandle* write_buffer; + Status status = write_page_->GetBuffer(&write_buffer); + DCHECK(status.ok()); // Write buffer should never have been unpinned. + DCHECK_GE(write_ptr_, write_buffer->data()); + DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len()); + DCHECK_GE(write_end_ptr_, write_ptr_); + } + DCHECK(has_read_iterator() || read_page_ == pages_.end()); + if (read_page_ != pages_.end()) { + CheckPageConsistency(&*read_page_); + DCHECK(read_page_->is_pinned()); + DCHECK(read_page_->retrieved_buffer); + // Can't check read buffer without affecting behaviour, because a read may be in + // flight and this would required blocking on that write. + DCHECK_GE(read_end_ptr_, read_ptr_); + } + if (NeedReadReservation()) { + DCHECK_EQ(default_page_len_, read_page_reservation_.GetReservation()) + << DebugString(); + } else if (!read_page_reservation_.is_closed()) { + DCHECK_EQ(0, read_page_reservation_.GetReservation()); + } + if (NeedWriteReservation()) { + DCHECK_EQ(default_page_len_, write_page_reservation_.GetReservation()); + } else if (!write_page_reservation_.is_closed()) { + DCHECK_EQ(0, write_page_reservation_.GetReservation()); + } +} + +void BufferedTupleStream::CheckPageConsistency(const Page* page) const { + DCHECK_EQ(ExpectedPinCount(pinned_, page), page->pin_count()) << DebugString(); + // Only one large row per page. + if (page->len() > default_page_len_) DCHECK_LE(page->num_rows, 1); + // We only create pages when we have a row to append to them. + DCHECK_GT(page->num_rows, 0); +} + +string BufferedTupleStream::DebugString() const { + stringstream ss; + ss << "BufferedTupleStream num_rows=" << num_rows_ + << " rows_returned=" << rows_returned_ << " pinned=" << pinned_ + << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ << "\n" + << " bytes_pinned=" << bytes_pinned_ << " has_write_iterator=" << has_write_iterator_ + << " write_page=" << write_page_ << " has_read_iterator=" << has_read_iterator_ + << " read_page="; + if (read_page_ == pages_.end()) { + ss << "<end>"; + } else { + ss << &*read_page_; + } + ss << "\n" + << " read_page_reservation="; + if (read_page_reservation_.is_closed()) { + ss << "<closed>"; + } else { + ss << read_page_reservation_.GetReservation(); + } + ss << " write_page_reservation="; + if (write_page_reservation_.is_closed()) { + ss << "<closed>"; + } else { + ss << write_page_reservation_.GetReservation(); + } + ss << "\n # pages=" << num_pages_ << " pages=[\n"; + for (const Page& page : pages_) { + ss << "{" << page.DebugString() << "}"; + if (&page != &pages_.back()) ss << ",\n"; + } + ss << "]"; + return ss.str(); +} + +string BufferedTupleStream::Page::DebugString() const { + return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows); +} + +Status BufferedTupleStream::Init(int node_id, bool pinned) { + if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT); + node_id_ = node_id; + return Status::OK(); +} + +Status BufferedTupleStream::PrepareForWrite(bool* got_reservation) { + // This must be the first iterator created. + DCHECK(pages_.empty()); + DCHECK(!delete_on_read_); + DCHECK(!has_write_iterator()); + DCHECK(!has_read_iterator()); + CHECK_CONSISTENCY_FULL(); + + *got_reservation = buffer_pool_client_->IncreaseReservationToFit(default_page_len_); + if (!*got_reservation) return Status::OK(); + has_write_iterator_ = true; + // Save reservation for the write iterators. + buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); + CHECK_CONSISTENCY_FULL(); + return Status::OK(); +} + +Status BufferedTupleStream::PrepareForReadWrite( + bool delete_on_read, bool* got_reservation) { + // This must be the first iterator created. + DCHECK(pages_.empty()); + DCHECK(!delete_on_read_); + DCHECK(!has_write_iterator()); + DCHECK(!has_read_iterator()); + CHECK_CONSISTENCY_FULL(); + + *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * default_page_len_); + if (!*got_reservation) return Status::OK(); + has_write_iterator_ = true; + // Save reservation for both the read and write iterators. + buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); + buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); + RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read)); + return Status::OK(); +} + +void BufferedTupleStream::Close(RowBatch* batch, RowBatch::FlushMode flush) { + for (Page& page : pages_) { + if (batch != nullptr && page.retrieved_buffer) { + // Subtle: We only need to attach buffers from pages that we may have returned + // references to. ExtractBuffer() cannot fail for these pages because the data + // is guaranteed to already be in -memory. + BufferPool::BufferHandle buffer; + Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer); + DCHECK(status.ok()); + batch->AddBuffer(buffer_pool_client_, move(buffer), flush); + } else { + buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle); + } + } + read_page_reservation_.Close(); + write_page_reservation_.Close(); + pages_.clear(); + num_pages_ = 0; + bytes_pinned_ = 0; + closed_ = true; +} + +int64_t BufferedTupleStream::CalcBytesPinned() const { + int64_t result = 0; + for (const Page& page : pages_) result += page.pin_count() * page.len(); + return result; +} + +Status BufferedTupleStream::PinPage(Page* page) { + RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle)); + bytes_pinned_ += page->len(); + return Status::OK(); +} + +int BufferedTupleStream::ExpectedPinCount(bool stream_pinned, const Page* page) const { + return (stream_pinned || is_read_page(page) || is_write_page(page)) ? 1 : 0; +} + +Status BufferedTupleStream::PinPageIfNeeded(Page* page, bool stream_pinned) { + int new_pin_count = ExpectedPinCount(stream_pinned, page); + if (new_pin_count != page->pin_count()) { + DCHECK_EQ(new_pin_count, page->pin_count() + 1); + RETURN_IF_ERROR(PinPage(page)); + } + return Status::OK(); +} + +void BufferedTupleStream::UnpinPageIfNeeded(Page* page, bool stream_pinned) { + int new_pin_count = ExpectedPinCount(stream_pinned, page); + if (new_pin_count != page->pin_count()) { + DCHECK_EQ(new_pin_count, page->pin_count() - 1); + buffer_pool_->Unpin(buffer_pool_client_, &page->handle); + bytes_pinned_ -= page->len(); + if (page->pin_count() == 0) page->retrieved_buffer = false; + } +} + +bool BufferedTupleStream::NeedWriteReservation() const { + return NeedWriteReservation(pinned_); +} + +bool BufferedTupleStream::NeedWriteReservation(bool stream_pinned) const { + return NeedWriteReservation(stream_pinned, num_pages_, has_write_iterator(), + write_page_ != nullptr, has_read_write_page()); +} + +bool BufferedTupleStream::NeedWriteReservation(bool stream_pinned, int64_t num_pages, + bool has_write_iterator, bool has_write_page, bool has_read_write_page) { + if (!has_write_iterator) return false; + // If the stream is empty the write reservation hasn't been used yet. + if (num_pages == 0) return true; + if (stream_pinned) { + // Make sure we've saved the write reservation for the next page if the only + // page is a read/write page. + return has_read_write_page && num_pages == 1; + } else { + // Make sure we've saved the write reservation if it's not being used to pin + // a page in the stream. + return !has_write_page || has_read_write_page; + } +} + +bool BufferedTupleStream::NeedReadReservation() const { + return NeedReadReservation(pinned_); +} + +bool BufferedTupleStream::NeedReadReservation(bool stream_pinned) const { + return NeedReadReservation( + stream_pinned, num_pages_, has_read_iterator(), read_page_ != pages_.end()); +} + +bool BufferedTupleStream::NeedReadReservation(bool stream_pinned, int64_t num_pages, + bool has_read_iterator, bool has_read_page) const { + return NeedReadReservation(stream_pinned, num_pages, has_read_iterator, has_read_page, + has_write_iterator(), write_page_ != nullptr); +} + +bool BufferedTupleStream::NeedReadReservation(bool stream_pinned, int64_t num_pages, + bool has_read_iterator, bool has_read_page, bool has_write_iterator, + bool has_write_page) { + if (!has_read_iterator) return false; + if (stream_pinned) { + // Need reservation if there are no pages currently pinned for reading but we may add + // a page. + return num_pages == 0 && has_write_iterator; + } else { + // Only need to save reservation for an unpinned stream if there is no read page + // and we may advance to one in the future. + return (has_write_iterator || num_pages > 0) && !has_read_page; + } +} + +Status BufferedTupleStream::NewWritePage(int64_t page_len) noexcept { + DCHECK(!closed_); + DCHECK(write_page_ == nullptr); + + Page new_page; + const BufferHandle* write_buffer; + RETURN_IF_ERROR(buffer_pool_->CreatePage( + buffer_pool_client_, page_len, &new_page.handle, &write_buffer)); + bytes_pinned_ += page_len; + total_byte_size_ += page_len; + + pages_.push_back(std::move(new_page)); + ++num_pages_; + write_page_ = &pages_.back(); + DCHECK_EQ(write_page_->num_rows, 0); + write_ptr_ = write_buffer->data(); + write_end_ptr_ = write_ptr_ + page_len; + return Status::OK(); +} + +Status BufferedTupleStream::CalcPageLenForRow(int64_t row_size, int64_t* page_len) { + if (UNLIKELY(row_size > max_page_len_)) { + return Status(TErrorCode::MAX_ROW_SIZE, + PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_, + PrettyPrinter::Print(0, TUnit::BYTES)); + } + *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size)); + return Status::OK(); +} + +Status BufferedTupleStream::AdvanceWritePage( + int64_t row_size, bool* got_reservation) noexcept { + DCHECK(has_write_iterator()); + CHECK_CONSISTENCY_FAST(); + + int64_t page_len; + RETURN_IF_ERROR(CalcPageLenForRow(row_size, &page_len)); + + // Reservation may have been saved for the next write page, e.g. by PrepareForWrite() + // if the stream is empty. + int64_t write_reservation_to_restore = 0, read_reservation_to_restore = 0; + if (NeedWriteReservation( + pinned_, num_pages_, true, write_page_ != nullptr, has_read_write_page()) + && !NeedWriteReservation(pinned_, num_pages_ + 1, true, true, false)) { + write_reservation_to_restore = default_page_len_; + } + // If the stream is pinned, we need to keep the previous write page pinned for reading. + // Check if we saved reservation for this case. + if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), + read_page_ != pages_.end(), true, write_page_ != nullptr) + && !NeedReadReservation(pinned_, num_pages_ + 1, has_read_iterator(), + read_page_ != pages_.end(), true, true)) { + read_reservation_to_restore = default_page_len_; + } + + // We may reclaim reservation by unpinning a page that was pinned for writing. + int64_t write_page_reservation_to_reclaim = + (write_page_ != nullptr && !pinned_ && !has_read_write_page()) ? + write_page_->len() : 0; + // Check to see if we can get the reservation before changing the state of the stream. + if (!buffer_pool_client_->IncreaseReservationToFit(page_len + - write_reservation_to_restore - read_reservation_to_restore + - write_page_reservation_to_reclaim)) { + DCHECK(pinned_ || page_len > default_page_len_) + << "If the stream is unpinned, this should only fail for large pages"; + CHECK_CONSISTENCY_FAST(); + *got_reservation = false; + return Status::OK(); + } + if (write_reservation_to_restore > 0) { + buffer_pool_client_->RestoreReservation( + &write_page_reservation_, write_reservation_to_restore); + } + if (read_reservation_to_restore > 0) { + buffer_pool_client_->RestoreReservation( + &read_page_reservation_, read_reservation_to_restore); + } + ResetWritePage(); + RETURN_IF_ERROR(NewWritePage(page_len)); + *got_reservation = true; + return Status::OK(); +} + +void BufferedTupleStream::ResetWritePage() { + if (write_page_ == nullptr) return; + // Unpin the write page if we're reading in unpinned mode. + Page* prev_write_page = write_page_; + write_page_ = nullptr; + write_ptr_ = nullptr; + write_end_ptr_ = nullptr; + + // May need to decrement pin count now that it's not the write page, depending on + // the stream's mode. + UnpinPageIfNeeded(prev_write_page, pinned_); +} + +void BufferedTupleStream::InvalidateWriteIterator() { + if (!has_write_iterator()) return; + ResetWritePage(); + has_write_iterator_ = false; + // No more pages will be appended to stream - do not need any write reservation. + write_page_reservation_.Close(); + // May not need a read reservation once the write iterator is invalidated. + if (NeedReadReservation(pinned_, num_pages_, has_read_iterator(), + read_page_ != pages_.end(), true, write_page_ != nullptr) + && !NeedReadReservation(pinned_, num_pages_, has_read_iterator(), + read_page_ != pages_.end(), false, false)) { + buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); + } +} + +Status BufferedTupleStream::NextReadPage() { + DCHECK(has_read_iterator()); + DCHECK(!closed_); + CHECK_CONSISTENCY_FAST(); + + if (read_page_ == pages_.end()) { + // No rows read yet - start reading at first page. If the stream is unpinned, we can + // use the reservation saved in PrepareForReadWrite() to pin the first page. + read_page_ = pages_.begin(); + if (NeedReadReservation(pinned_, num_pages_, true, false) + && !NeedReadReservation(pinned_, num_pages_, true, true)) { + buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); + } + } else if (delete_on_read_) { + DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " + << DebugString(); + DCHECK_NE(&*read_page_, write_page_); + bytes_pinned_ -= pages_.front().len(); + buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle); + pages_.pop_front(); + --num_pages_; + read_page_ = pages_.begin(); + } else { + // Unpin pages after reading them if needed. + Page* prev_read_page = &*read_page_; + ++read_page_; + UnpinPageIfNeeded(prev_read_page, pinned_); + } + + if (read_page_ == pages_.end()) { + CHECK_CONSISTENCY_FULL(); + return Status::OK(); + } + + if (!pinned_ && read_page_->len() > default_page_len_ + && buffer_pool_client_->GetUnusedReservation() < read_page_->len()) { + // If we are iterating over an unpinned stream and encounter a page that is larger + // than the default page length, then unpinning the previous page may not have + // freed up enough reservation to pin the next one. The client is responsible for + // ensuring the reservation is available, so this indicates a bug. + return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin " + "large page of $0 bytes, client only had $1 bytes of unused reservation:\n$2", + read_page_->len(), buffer_pool_client_->GetUnusedReservation(), + buffer_pool_client_->DebugString())); + } + // Ensure the next page is pinned for reading. By this point we should have enough + // reservation to pin the page. If the stream is pinned, the page is already pinned. + // If the stream is unpinned, we freed up enough memory for a default-sized page by + // deleting or unpinning the previous page and ensured that, if the page was larger, + // that the reservation is available with the above check. + RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + + // This waits for the pin to complete if the page was unpinned earlier. + const BufferHandle* read_buffer; + RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); + + read_page_rows_returned_ = 0; + read_ptr_ = read_buffer->data(); + read_end_ptr_ = read_ptr_ + read_buffer->len(); + + // We may need to save reservation for the write page in the case when the write page + // became a read/write page. + if (!NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), + write_page_ != nullptr, false) + && NeedWriteReservation(pinned_, num_pages_, has_write_iterator(), + write_page_ != nullptr, has_read_write_page())) { + buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); + } + CHECK_CONSISTENCY_FAST(); + return Status::OK(); +} + +void BufferedTupleStream::InvalidateReadIterator() { + if (read_page_ != pages_.end()) { + // Unpin the write page if we're reading in unpinned mode. + Page* prev_read_page = &*read_page_; + read_page_ = pages_.end(); + read_ptr_ = nullptr; + read_end_ptr_ = nullptr; + + // May need to decrement pin count after destroying read iterator. + UnpinPageIfNeeded(prev_read_page, pinned_); + } + has_read_iterator_ = false; + if (read_page_reservation_.GetReservation() > 0) { + buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); + } + // It is safe to re-read a delete-on-read stream if no rows were read and no pages + // were therefore deleted. + if (rows_returned_ == 0) delete_on_read_ = false; +} + +Status BufferedTupleStream::PrepareForRead(bool delete_on_read, bool* got_reservation) { + CHECK_CONSISTENCY_FULL(); + InvalidateWriteIterator(); + InvalidateReadIterator(); + // If already pinned, no additional pin is needed (see ExpectedPinCount()). + *got_reservation = pinned_ || pages_.empty() + || buffer_pool_client_->IncreaseReservationToFit(default_page_len_); + if (!*got_reservation) return Status::OK(); + return PrepareForReadInternal(delete_on_read); +} + +Status BufferedTupleStream::PrepareForReadInternal(bool delete_on_read) { + DCHECK(!closed_); + DCHECK(!delete_on_read_); + DCHECK(!has_read_iterator()); + + has_read_iterator_ = true; + if (pages_.empty()) { + // No rows to return, or a the first read/write page has not yet been allocated. + read_page_ = pages_.end(); + read_ptr_ = nullptr; + read_end_ptr_ = nullptr; + } else { + // Eagerly pin the first page in the stream. + read_page_ = pages_.begin(); + // Check if we need to increment the pin count of the read page. + RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + DCHECK(read_page_->is_pinned()); + + // This waits for the pin to complete if the page was unpinned earlier. + const BufferHandle* read_buffer; + RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer)); + read_ptr_ = read_buffer->data(); + read_end_ptr_ = read_ptr_ + read_buffer->len(); + } + read_page_rows_returned_ = 0; + rows_returned_ = 0; + delete_on_read_ = delete_on_read; + CHECK_CONSISTENCY_FULL(); + return Status::OK(); +} + +Status BufferedTupleStream::PinStream(bool* pinned) { + DCHECK(!closed_); + CHECK_CONSISTENCY_FULL(); + if (pinned_) { + *pinned = true; + return Status::OK(); + } + *pinned = false; + // First, make sure we have the reservation to pin all the pages for reading. + int64_t bytes_to_pin = 0; + for (Page& page : pages_) { + bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len(); + } + + // Check if we have some reservation to restore. + bool restore_write_reservation = + NeedWriteReservation(false) && !NeedWriteReservation(true); + bool restore_read_reservation = + NeedReadReservation(false) && !NeedReadReservation(true); + int64_t increase_needed = bytes_to_pin + - (restore_write_reservation ? default_page_len_ : 0) + - (restore_read_reservation ? default_page_len_ : 0); + bool reservation_granted = + buffer_pool_client_->IncreaseReservationToFit(increase_needed); + if (!reservation_granted) return Status::OK(); + + // If there is no current write page we should have some saved reservation to use. + // Only continue saving it if the stream is empty and need it to pin the first page. + if (restore_write_reservation) { + buffer_pool_client_->RestoreReservation(&write_page_reservation_, default_page_len_); + } + if (restore_read_reservation) { + buffer_pool_client_->RestoreReservation(&read_page_reservation_, default_page_len_); + } + + // At this point success is guaranteed - go through to pin the pages we need to pin. + // If the page data was evicted from memory, the read I/O can happen in parallel + // because we defer calling GetBuffer() until NextReadPage(). + for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true)); + + pinned_ = true; + *pinned = true; + CHECK_CONSISTENCY_FULL(); + return Status::OK(); +} + +void BufferedTupleStream::UnpinStream(UnpinMode mode) { + CHECK_CONSISTENCY_FULL(); + DCHECK(!closed_); + if (mode == UNPIN_ALL) { + // Invalidate the iterators so they don't keep pages pinned. + InvalidateWriteIterator(); + InvalidateReadIterator(); + } + + if (pinned_) { + CHECK_CONSISTENCY_FULL(); + // If the stream was pinned, there may be some remaining pinned pages that should + // be unpinned at this point. + for (Page& page : pages_) UnpinPageIfNeeded(&page, false); + + // Check to see if we need to save some of the reservation we freed up. + if (!NeedWriteReservation(true) && NeedWriteReservation(false)) { + buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); + } + if (!NeedReadReservation(true) && NeedReadReservation(false)) { + buffer_pool_client_->SaveReservation(&read_page_reservation_, default_page_len_); + } + pinned_ = false; + } + CHECK_CONSISTENCY_FULL(); +} + +Status BufferedTupleStream::GetRows( + MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) { + if (num_rows() > numeric_limits<int>::max()) { + // RowBatch::num_rows_ is a 32-bit int, avoid an overflow. + return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit " + "is $1", + num_rows(), numeric_limits<int>::max())); + } + RETURN_IF_ERROR(PinStream(got_rows)); + if (!*got_rows) return Status::OK(); + bool got_reservation; + RETURN_IF_ERROR(PrepareForRead(false, &got_reservation)); + DCHECK(got_reservation) << "Stream was pinned"; + batch->reset(new RowBatch(desc_, num_rows(), tracker)); + bool eos = false; + // Loop until GetNext fills the entire batch. Each call can stop at page + // boundaries. We generally want it to stop, so that pages can be freed + // as we read. It is safe in this case because we pin the entire stream. + while (!eos) { + RETURN_IF_ERROR(GetNext(batch->get(), &eos)); + } + return Status::OK(); +} + +Status BufferedTupleStream::GetNext(RowBatch* batch, bool* eos) { + return GetNextInternal<false>(batch, eos, nullptr); +} + +Status BufferedTupleStream::GetNext( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + return GetNextInternal<true>(batch, eos, flat_rows); +} + +template <bool FILL_FLAT_ROWS> +Status BufferedTupleStream::GetNextInternal( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + if (has_nullable_tuple_) { + return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows); + } else { + return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows); + } +} + +template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> +Status BufferedTupleStream::GetNextInternal( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + DCHECK(!closed_); + DCHECK(batch->row_desc()->Equals(*desc_)); + DCHECK(is_pinned() || !FILL_FLAT_ROWS) + << "FlatRowPtrs are only valid for pinned streams"; + *eos = (rows_returned_ == num_rows_); + if (*eos) return Status::OK(); + + if (UNLIKELY(read_page_ == pages_.end() + || read_page_rows_returned_ == read_page_->num_rows)) { + // Get the next page in the stream (or the first page if read_page_ was not yet + // initialized.) We need to do this at the beginning of the GetNext() call to ensure + // the buffer management semantics. NextReadPage() may unpin or delete the buffer + // backing the rows returned from the *previous* call to GetNext(). + RETURN_IF_ERROR(NextReadPage()); + } + + DCHECK(has_read_iterator()); + DCHECK(read_page_ != pages_.end()); + DCHECK(read_page_->is_pinned()) << DebugString(); + DCHECK_GE(read_page_rows_returned_, 0); + + int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_; + int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page); + DCHECK_GE(rows_to_fill, 1); + uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows())); + + // Produce tuple rows from the current page and the corresponding position on the + // null tuple indicator. + if (FILL_FLAT_ROWS) { + DCHECK(flat_rows != nullptr); + DCHECK(!delete_on_read_); + DCHECK_EQ(batch->num_rows(), 0); + flat_rows->clear(); + flat_rows->reserve(rows_to_fill); + } + + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); + // Start reading from the current position in 'read_page_'. + for (int i = 0; i < rows_to_fill; ++i) { + if (FILL_FLAT_ROWS) { + flat_rows->push_back(read_ptr_); + DCHECK_EQ(flat_rows->size(), i + 1); + } + // Copy the row into the output batch. + TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem); + tuple_row_mem += sizeof(Tuple*) * tuples_per_row; + UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row); + + // Update string slot ptrs, skipping external strings. + for (int j = 0; j < inlined_string_slots_.size(); ++j) { + Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + FixUpStringsForRead(inlined_string_slots_[j].second, tuple); + } + + // Update collection slot ptrs, skipping external collections. We traverse the + // collection structure in the same order as it was written to the stream, allowing + // us to infer the data layout based on the length of collections and strings. + for (int j = 0; j < inlined_coll_slots_.size(); ++j) { + Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple); + } + } + + batch->CommitRows(rows_to_fill); + rows_returned_ += rows_to_fill; + read_page_rows_returned_ += rows_to_fill; + *eos = (rows_returned_ == num_rows_); + if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) { + // No more data in this page. The batch must be immediately returned up the operator + // tree and deep copied so that NextReadPage() can reuse the read page's buffer. + // TODO: IMPALA-4179 - instead attach the buffer and flush the resources. + batch->MarkNeedsDeepCopy(); + } + if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill); + DCHECK_LE(read_ptr_, read_end_ptr_); + return Status::OK(); +} + +void BufferedTupleStream::FixUpStringsForRead( + const vector<SlotDescriptor*>& string_slots, Tuple* tuple) { + DCHECK(tuple != nullptr); + for (const SlotDescriptor* slot_desc : string_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + + StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); + DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_); + sv->ptr = reinterpret_cast<char*>(read_ptr_); + read_ptr_ += sv->len; + } +} + +void BufferedTupleStream::FixUpCollectionsForRead( + const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) { + DCHECK(tuple != nullptr); + for (const SlotDescriptor* slot_desc : collection_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + + CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); + const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); + int coll_byte_size = cv->num_tuples * item_desc.byte_size(); + DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_); + cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_); + read_ptr_ += coll_byte_size; + + if (!item_desc.HasVarlenSlots()) continue; + uint8_t* coll_data = cv->ptr; + for (int i = 0; i < cv->num_tuples; ++i) { + Tuple* item = reinterpret_cast<Tuple*>(coll_data); + FixUpStringsForRead(item_desc.string_slots(), item); + FixUpCollectionsForRead(item_desc.collection_slots(), item); + coll_data += item_desc.byte_size(); + } + } +} + +int64_t BufferedTupleStream::ComputeRowSize(TupleRow* row) const noexcept { + int64_t size = 0; + if (has_nullable_tuple_) { + size += NullIndicatorBytesPerRow(); + for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { + if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i]; + } + } else { + for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { + size += fixed_tuple_sizes_[i]; + } + } + for (int i = 0; i < inlined_string_slots_.size(); ++i) { + Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); + if (tuple == nullptr) continue; + const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second; + for (auto it = slots.begin(); it != slots.end(); ++it) { + if (tuple->IsNull((*it)->null_indicator_offset())) continue; + size += tuple->GetStringSlot((*it)->tuple_offset())->len; + } + } + + for (int i = 0; i < inlined_coll_slots_.size(); ++i) { + Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); + if (tuple == nullptr) continue; + const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second; + for (auto it = slots.begin(); it != slots.end(); ++it) { + if (tuple->IsNull((*it)->null_indicator_offset())) continue; + CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset()); + const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor(); + size += cv->num_tuples * item_desc.byte_size(); + + if (!item_desc.HasVarlenSlots()) continue; + for (int j = 0; j < cv->num_tuples; ++j) { + Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]); + size += item->VarlenByteSize(item_desc); + } + } + } + return size; +} + +bool BufferedTupleStream::AddRowSlow(TupleRow* row, Status* status) noexcept { + // Use AddRowCustom*() to do the work of advancing the page. + int64_t row_size = ComputeRowSize(row); + uint8_t* data = AddRowCustomBeginSlow(row_size, status); + if (data == nullptr) return false; + bool success = DeepCopy(row, &data, data + row_size); + DCHECK(success); + DCHECK_EQ(data, write_ptr_); + AddRowCustomEnd(row_size); + return true; +} + +uint8_t* BufferedTupleStream::AddRowCustomBeginSlow( + int64_t size, Status* status) noexcept { + bool got_reservation; + *status = AdvanceWritePage(size, &got_reservation); + if (!status->ok() || !got_reservation) return nullptr; + + // We have a large-enough page so now success is guaranteed. + uint8_t* result = AddRowCustomBegin(size, status); + DCHECK(result != nullptr); + return result; +} + +void BufferedTupleStream::AddLargeRowCustomEnd(int64_t size) noexcept { + DCHECK_GT(size, default_page_len_); + // Immediately unpin the large write page so that we're not using up extra reservation + // and so we don't append another row to the page. + ResetWritePage(); + // Save some of the reservation we freed up so we can create the next write page when + // needed. + if (NeedWriteReservation()) { + buffer_pool_client_->SaveReservation(&write_page_reservation_, default_page_len_); + } + // The stream should be in a consistent state once the row is added. + CHECK_CONSISTENCY_FAST(); +} + +bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept { + DCHECK(!closed_); + DCHECK(has_write_iterator()); + if (UNLIKELY(write_page_ == nullptr || !DeepCopy(row, &write_ptr_, write_end_ptr_))) { + return AddRowSlow(row, status); + } + ++num_rows_; + ++write_page_->num_rows; + return true; +} + +bool BufferedTupleStream::DeepCopy( + TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept { + return has_nullable_tuple_ ? DeepCopyInternal<true>(row, data, data_end) : + DeepCopyInternal<false>(row, data, data_end); +} + +// TODO: consider codegening this. +// TODO: in case of duplicate tuples, this can redundantly serialize data. +template <bool HAS_NULLABLE_TUPLE> +bool BufferedTupleStream::DeepCopyInternal( + TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept { + uint8_t* pos = *data; + const uint64_t tuples_per_row = desc_->tuple_descriptors().size(); + // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple + // indicator. + if (HAS_NULLABLE_TUPLE) { + int null_indicator_bytes = NullIndicatorBytesPerRow(); + if (UNLIKELY(pos + null_indicator_bytes > data_end)) return false; + uint8_t* null_indicators = pos; + pos += NullIndicatorBytesPerRow(); + memset(null_indicators, 0, null_indicator_bytes); + for (int i = 0; i < tuples_per_row; ++i) { + uint8_t* null_word = null_indicators + (i >> 3); + const uint32_t null_pos = i & 7; + const int tuple_size = fixed_tuple_sizes_[i]; + Tuple* t = row->GetTuple(i); + const uint8_t mask = 1 << (7 - null_pos); + if (t != nullptr) { + if (UNLIKELY(pos + tuple_size > data_end)) return false; + memcpy(pos, t, tuple_size); + pos += tuple_size; + } else { + *null_word |= mask; + } + } + } else { + // If we know that there are no nullable tuples no need to set the nullability flags. + for (int i = 0; i < tuples_per_row; ++i) { + const int tuple_size = fixed_tuple_sizes_[i]; + if (UNLIKELY(pos + tuple_size > data_end)) return false; + Tuple* t = row->GetTuple(i); + // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) + // is delivered, the check below should become DCHECK(t != nullptr). + DCHECK(t != nullptr || tuple_size == 0); + memcpy(pos, t, tuple_size); + pos += tuple_size; + } + } + + // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets + // on the write path, only on the read. The tuple data is immediately followed + // by the string data so only the len information is necessary. + for (int i = 0; i < inlined_string_slots_.size(); ++i) { + const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second, &pos, data_end))) + return false; + } + + // Copy inlined collection slots. We copy collection data in a well-defined order so + // we do not need to convert pointers to offsets on the write path. + for (int i = 0; i < inlined_coll_slots_.size(); ++i) { + const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second, &pos, data_end))) + return false; + } + *data = pos; + return true; +} + +bool BufferedTupleStream::CopyStrings(const Tuple* tuple, + const vector<SlotDescriptor*>& string_slots, uint8_t** data, const uint8_t* data_end) { + for (const SlotDescriptor* slot_desc : string_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); + if (LIKELY(sv->len > 0)) { + if (UNLIKELY(*data + sv->len > data_end)) return false; + + memcpy(*data, sv->ptr, sv->len); + *data += sv->len; + } + } + return true; +} + +bool BufferedTupleStream::CopyCollections(const Tuple* tuple, + const vector<SlotDescriptor*>& collection_slots, uint8_t** data, const uint8_t* data_end) { + for (const SlotDescriptor* slot_desc : collection_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); + const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); + if (LIKELY(cv->num_tuples > 0)) { + int coll_byte_size = cv->num_tuples * item_desc.byte_size(); + if (UNLIKELY(*data + coll_byte_size > data_end)) return false; + uint8_t* coll_data = *data; + memcpy(coll_data, cv->ptr, coll_byte_size); + *data += coll_byte_size; + + if (!item_desc.HasVarlenSlots()) continue; + // Copy variable length data when present in collection items. + for (int i = 0; i < cv->num_tuples; ++i) { + const Tuple* item = reinterpret_cast<Tuple*>(coll_data); + if (UNLIKELY(!CopyStrings(item, item_desc.string_slots(), data, data_end))) { + return false; + } + if (UNLIKELY( + !CopyCollections(item, item_desc.collection_slots(), data, data_end))) { + return false; + } + coll_data += item_desc.byte_size(); + } + } + } + return true; +} + +void BufferedTupleStream::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const { + DCHECK(row != nullptr); + DCHECK(!closed_); + DCHECK(is_pinned()); + DCHECK(!delete_on_read_); + uint8_t* data = flat_row; + return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) : + UnflattenTupleRow<false>(&data, row); +} + +template <bool HAS_NULLABLE_TUPLE> +void BufferedTupleStream::UnflattenTupleRow(uint8_t** data, TupleRow* row) const { + const int tuples_per_row = desc_->tuple_descriptors().size(); + uint8_t* ptr = *data; + if (has_nullable_tuple_) { + // Stitch together the tuples from the page and the NULL ones. + const uint8_t* null_indicators = ptr; + ptr += NullIndicatorBytesPerRow(); + for (int i = 0; i < tuples_per_row; ++i) { + const uint8_t* null_word = null_indicators + (i >> 3); + const uint32_t null_pos = i & 7; + const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); + row->SetTuple( + i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null)); + ptr += fixed_tuple_sizes_[i] * is_not_null; + } + } else { + for (int i = 0; i < tuples_per_row; ++i) { + row->SetTuple(i, reinterpret_cast<Tuple*>(ptr)); + ptr += fixed_tuple_sizes_[i]; + } + } + *data = ptr; +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h new file mode 100644 index 0000000..dbf3faf --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream.h @@ -0,0 +1,705 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H +#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H + +#include <set> +#include <vector> +#include <boost/scoped_ptr.hpp> +#include <boost/function.hpp> + +#include "common/global-types.h" +#include "common/status.h" +#include "gutil/macros.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/row-batch.h" + +namespace impala { + +class MemTracker; +class RuntimeState; +class RowDescriptor; +class SlotDescriptor; +class Tuple; +class TupleRow; + +/// Class that provides an abstraction for a stream of tuple rows backed by BufferPool +/// Pages. Rows can be added to the stream and read back. Rows are returned in the order +/// they are added. +/// +/// The BufferedTupleStream is *not* thread safe from the caller's point of view. +/// Different threads should not concurrently call methods of the same BufferedTupleStream +/// object. +/// +/// Reading and writing the stream: +/// The stream supports two modes of reading/writing, depending on whether +/// PrepareForWrite() is called to initialize a write iterator only or +/// PrepareForReadWrite() is called to initialize both read and write iterators to enable +/// interleaved reads and writes. +/// +/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AddRowCustom*() +/// are called repeatedly to initialize then advance a write iterator through the stream. +/// Once the stream is fully written, it can be read back by calling PrepareForRead() +/// then GetNext() repeatedly to advance a read iterator through the stream, or by +/// calling GetRows() to get all of the rows at once. +/// +/// To use read/write mode, PrepareForReadWrite() is called once to initialize the read +/// and write iterators. AddRow()/AddRowCustom*() then advance a write iterator through +/// the stream, and GetNext() advances a trailing read iterator through the stream. +/// +/// Buffer management: +/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses +/// the client's reservation to pin pages in memory. It will automatically try to +/// increase the client's reservation whenever it needs to do so to make progress. +/// +/// Normally pages are all of the same default page length, but larger pages up to the +/// max page length are used if needed to store rows that are too large for a +/// default-length page. +/// +/// The stream has both pinned and unpinned modes. In the pinned mode all pages are +/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory +/// and allows clients to save pointers to rows in the stream and randomly access them. +/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only +/// pages currently being read and written are pinned and other pages are unpinned and +/// therefore do not use the client's reservation and can be spilled to disk. The stream +/// always holds onto a default page's worth of reservation for the read and write +/// iterators (i.e. two page's worth if the stream is in read/write mode), even if that +/// many pages are not currently pinned. This means that UnpinStream() always succeeds, +/// and moving to the next default-length write page or read page on an unpinned stream +/// does not require additional reservation. This is implemented by saving reservations +/// in SubReservations. +/// +/// To read or write a row larger than the default page size to/from an unpinned stream, +/// the client must have max_page_len - default_page_len unused reservation. Writing a +/// large row to an unpinned stream only uses the reservation for the duration of the +/// AddRow()/AddRowCustom*() call. Reading a large row from an unpinned stream uses the +/// reservation until the next call to GetNext(). E.g. to partition a single unpinned +/// stream into n unpinned streams, the reservation needed is (n - 1) * +/// default_page_len + 2 * max_page_len: one large read buffer and one large write +/// buffer is needed to keep the row being processed in-memory, but only default-sized +/// buffers are needed for the other streams being written. +/// +/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag +/// to PrepareForRead() which deletes the stream's pages as it does a final read +/// pass over the stream. +/// +/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach +/// buffers to RowBatches. +/// +/// Page layout: +/// Rows are stored back to back starting at the first byte of each page's buffer, with +/// no interleaving of data from different rows. There is no padding or alignment +/// between rows. Rows larger than the default page length are stored on their own +/// page. +/// +/// Tuple row layout: +/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a +/// bitstring at the start of each row with null indicators for all tuples in each row +/// (including non-nullable tuples). The bitstring occupies ceil(num_tuples_per_row / 8) +/// bytes. A 1 indicates the tuple is null. +/// +/// The fixed length parts of the row's tuples are stored first, followed by var len data +/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can +/// point to var len data outside the stream. When reading the stream, the length of each +/// row's var len data in the stream must be computed to find the next row's start. +/// +/// The tuple stream supports reading from the stream into RowBatches without copying +/// out any data: the RowBatches' Tuple pointers will point directly into the stream's +/// pages' buffers. The fixed length parts follow Impala's internal tuple format, so for +/// the tuple to be valid, we only need to update pointers to point to the var len data +/// in the stream. These pointers need to be updated by the stream because a spilled +/// page's data may be relocated to a different buffer. The pointers are updated lazily +/// upon reading the stream via GetNext() or GetRows(). +/// +/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world")) +/// with all var len data stored in the stream: +/// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ... +/// +--------+-----------+-----------+-----------+-------------+ +/// | IntVal | StringVal | BigIntVal | StringVal | | ... +/// +--------+-----------+-----------+-----------++------------+ +/// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ... +/// | | ptr: 0x.. | | ptr: 0x.. | | ... +/// +--------+-----------+-----------+-----------+-------------+ +/// <--4b--> <---12b---> <----8b---> <---12b---> <----10b----> +/// +/// Example layout for a row with the second tuple nullable ((1, "hello"), NULL) +/// with all var len data stored in the stream: +/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row ... +/// +-------------------------+--------+-----------+------------+ +/// | | IntVal | StringVal | | ... +/// +-------------------------+--------+-----------+------------+ +/// | 0000 0010 | val: 1 | len: 5 | hello | ... +/// | | | ptr: 0x.. | | ... +/// +-------------------------+--------+-----------+------------+ +/// <---------1b------------> <--4b--> <---12b---> <----5b----> +/// +/// Example layout for a row with a single non-nullable tuple (("hello", "world")) with +/// the second string slot stored externally to the stream: +/// <------ tuple 1 ------> <- var len -> <- next row ... +/// +-----------+-----------+-------------+ +/// | StringVal | StringVal | | ... +/// +-----------+-----------+-------------+ +/// | len: 5 | len: 5 | hello | ... +/// | ptr: 0x.. | ptr: 0x.. | | ... +/// +-----------+-----------+-------------+ +/// <---12b---> <---12b---> <-----5b----> +/// +/// The behavior of reads and writes is as follows: +/// Read: +/// 1. Unpinned: Only a single read page is pinned at a time. This means that only +/// enough reservation to pin a single page is needed to read the stream, regardless +/// of the stream's size. Each page is deleted or unpinned (if delete on read is true +/// or false respectively) before advancing to the next page. +/// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or +/// unpinned when reading from the stream. If delete on read is true, pages are +/// deleted after being read. If the stream was previously unpinned, the page's data +/// may not yet be in memory - reading from the stream can block on I/O or fail with +/// an I/O error. +/// Write: +/// 1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation +/// to pin a single write page is required to write to the stream, regardless of the +/// stream's size. +/// 2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned +/// because the client's reservation is insufficient (and could not be increased by +/// the stream), the read call will fail and the client can either unpin the stream +/// or free up other memory before retrying. +/// +/// Memory lifetime of rows read from stream: +/// If the stream is pinned and delete on read is false, it is valid to access any tuples +/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is +/// unpinned or delete on read is true, then the batch returned from GetNext() may have +/// the needs_deep_copy flag set, which means that any tuple memory returned so far from +/// the stream may be freed on the next call to GetNext(). +/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch. +/// +/// Manual construction of rows with AddRowCustomBegin()/AddRowCustomEnd(): +/// The BufferedTupleStream supports allocation of uninitialized rows with +/// AddRowCustom*(). AddRowCustomBegin() is called instead of AddRow() if the client wants +/// to manually construct a row. The caller of AddRowCustomBegin() is responsible for +/// writing the row with exactly the layout described above then calling +/// AddRowCustomEnd() when done. +/// +/// If a caller constructs a tuple in this way, the caller can set the pointers and they +/// will not be modified until the stream is read via GetNext() or GetRows(). +/// TODO: IMPALA-5007: try to remove AddRowCustom*() by unifying with AddRow(). +/// +/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a +/// page will need to be pinned soon. +class BufferedTupleStream { + public: + /// A pointer to the start of a flattened TupleRow in the stream. + typedef uint8_t* FlatRowPtr; + + /// row_desc: description of rows stored in the stream. This is the desc for rows + /// that are added and the rows being returned. + /// page_len: the size of pages to use in the stream + /// ext_varlen_slots: set of varlen slots with data stored externally to the stream + BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc, + BufferPool::ClientHandle* buffer_pool_client, int64_t default_page_len, + int64_t max_page_len, + const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); + + virtual ~BufferedTupleStream(); + + /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called + /// once before any of the other APIs. + /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned. + /// 'node_id' is only used for error reporting. + Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT; + + /// Prepares the stream for writing by saving enough reservation for a default-size + /// write page. Tries to increase reservation if there is not enough unused reservation + /// for a page. Called after Init() and before the first AddRow() or + /// AddRowCustomBegin() call. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// first write page and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT; + + /// Prepares the stream for interleaved reads and writes by saving enough reservation + /// for default-sized read and write pages. Called after Init() and before the first + /// AddRow() or AddRowCustomBegin() call. + /// 'delete_on_read': Pages are deleted after they are read. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// read and write pages and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForReadWrite( + bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + + /// Prepares the stream for reading, invalidating the write iterator (if there is one). + /// Therefore must be called after the last AddRow() or AddRowCustomEnd() and before + /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes + /// over the stream, unless rows were read from the stream after PrepareForRead() or + /// PrepareForReadWrite() was called with delete_on_read = true. + /// 'delete_on_read': Pages are deleted after they are read. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// first read page and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + + /// Adds a single row to the stream. There are three possible outcomes: + /// a) The append succeeds. True is returned. + /// b) The append fails because the unused reservation was not sufficient to add + /// a new page to the stream large enough to fit 'row' and the stream could not + /// increase the reservation to get enough unused reservation. Returns false and + /// sets 'status' to OK. The append can be retried after freeing up memory or + /// unpinning the stream. + /// c) The append fails with a runtime error. Returns false and sets 'status' to an + /// error. + /// d) The append fails becase the row is too large to fit in a page of a stream. + /// Returns false and sets 'status' to an error. + /// + /// Unpinned streams can only encounter case b) when appending a row larger than + /// the default page size and the reservation could not be increased sufficiently. + /// Otherwise enough memory is automatically freed up by unpinning the current write + /// page. + /// + /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() + /// returns an error, it should not be called again. + bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; + + /// Allocates space to store a row of 'size' bytes (including fixed and variable length + /// data). If successful, returns a pointer to the allocated row. The caller then must + /// writes valid data to the row and call AddRowCustomEnd(). + /// + /// If unsuccessful, returns nullptr. The failure modes are the same as described in the + /// AddRow() comment. + ALWAYS_INLINE uint8_t* AddRowCustomBegin(int64_t size, Status* status); + + /// Called after AddRowCustomBegin() when done writing the row. Only should be called + /// if AddRowCustomBegin() succeeded. See the AddRowCustomBegin() comment for + /// explanation. + /// 'size': the size passed into AddRowCustomBegin(). + void AddRowCustomEnd(int64_t size); + + /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the + /// stream is pinned. The row must have been allocated with the stream's row desc. + /// The returned 'row' is backed by memory from the stream so is only valid as long + /// as the stream is pinned. + void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const; + + /// Pins all pages in this stream and switches to pinned mode. Has no effect if the + /// stream is already pinned. + /// If the current unused reservation is not sufficient to pin the stream in memory, + /// this will try to increase the reservation. If that fails, 'pinned' is set to false + /// and the stream is left unpinned. Otherwise 'pinned' is set to true. + Status PinStream(bool* pinned) WARN_UNUSED_RESULT; + + /// Modes for UnpinStream(). + enum UnpinMode { + /// All pages in the stream are unpinned and the read/write positions in the stream + /// are reset. No more rows can be written to the stream after this. The stream can + /// be re-read from the beginning by calling PrepareForRead(). + UNPIN_ALL, + /// All pages are unpinned aside from the current read and write pages (if any), + /// which is left in the same state. The unpinned stream can continue being read + /// or written from the current read or write positions. + UNPIN_ALL_EXCEPT_CURRENT, + }; + + /// Unpins stream with the given 'mode' as described above. + void UnpinStream(UnpinMode mode); + + /// Get the next batch of output rows, which are backed by the stream's memory. + /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy' + /// flag may be set on 'batch' to signal that memory will be freed on the next + /// call to GetNext() and that the caller should copy out any data it needs from + /// rows in 'batch' or in previous batches returned from GetNext(). + /// + /// If the stream is pinned and 'delete_on_read' is false, the memory backing the + /// rows will remain valid until the stream is unpinned, destroyed, etc. + /// TODO: IMPALA-4179: update when we simplify the memory transfer model. + Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; + + /// Same as above, but populate 'flat_rows' with a pointer to the flat version of + /// each returned row in the pinned stream. The pointers in 'flat_rows' are only + /// valid as long as the stream remains pinned. + Status GetNext( + RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT; + + /// Returns all the rows in the stream in batch. This pins the entire stream in the + /// process. If the current unused reservation is not sufficient to pin the stream in + /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set + /// to false. + Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch, + bool* got_rows) WARN_UNUSED_RESULT; + + /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, + /// attaches buffers from pinned pages that rows returned from GetNext() may reference. + /// Otherwise deletes all pages. Does nothing if the stream was already closed. The + /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching buffers. + void Close(RowBatch* batch, RowBatch::FlushMode flush); + + /// Number of rows in the stream. + int64_t num_rows() const { return num_rows_; } + + /// Number of rows returned via GetNext(). + int64_t rows_returned() const { return rows_returned_; } + + /// Returns the byte size necessary to store the entire stream in memory. + int64_t byte_size() const { return total_byte_size_; } + + /// Returns the number of bytes currently pinned in memory by the stream. + /// If ignore_current is true, the write_page_ memory is not included. + int64_t BytesPinned(bool ignore_current) const { + if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) { + return bytes_pinned_ - write_page_->len(); + } + return bytes_pinned_; + } + + bool is_closed() const { return closed_; } + bool is_pinned() const { return pinned_; } + bool has_read_iterator() const { return has_read_iterator_; } + bool has_write_iterator() const { return has_write_iterator_; } + + std::string DebugString() const; + + private: + DISALLOW_COPY_AND_ASSIGN(BufferedTupleStream); + friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; + friend class ArrayTupleStreamTest_TestComputeRowSize_Test; + friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; + friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; + + /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. + struct Page { + Page() : num_rows(0), retrieved_buffer(true) {} + + inline int len() const { return handle.len(); } + inline bool is_pinned() const { return handle.is_pinned(); } + inline int pin_count() const { return handle.pin_count(); } + Status GetBuffer(const BufferPool::BufferHandle** buffer) { + RETURN_IF_ERROR(handle.GetBuffer(buffer)); + retrieved_buffer = true; + return Status::OK(); + } + std::string DebugString() const; + + BufferPool::PageHandle handle; + + /// Number of rows written to the page. + int num_rows; + + /// Whether we called GetBuffer() on the page since it was last pinned. This means + /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have + /// returned rows referencing the page's buffer. + bool retrieved_buffer; + }; + + /// Runtime state instance used to check for cancellation. Not owned. + RuntimeState* const state_; + + /// Description of rows stored in the stream. + const RowDescriptor* desc_; + + /// Plan node ID, used for error reporting. + int node_id_; + + /// The size of the fixed length portion for each tuple in the row. + std::vector<int> fixed_tuple_sizes_; + + /// Vectors of all the strings slots that have their varlen data stored in stream + /// grouped by tuple_idx. + std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_; + + /// Vectors of all the collection slots that have their varlen data stored in the + /// stream, grouped by tuple_idx. + std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_; + + /// Buffer pool and client used to allocate, pin and release pages. Not owned. + BufferPool* buffer_pool_; + BufferPool::ClientHandle* buffer_pool_client_; + + /// List of pages in the stream. + /// Empty iff one of two cases applies: + /// * before the first row has been added with AddRow() or AddRowCustom(). + /// * after the stream has been destructively read in 'delete_on_read' mode + std::list<Page> pages_; + // IMPALA-5629: avoid O(n) list.size() call by explicitly tracking the number of pages. + // TODO: remove when we switch to GCC5+, where list.size() is O(1). See GCC bug #49561. + int64_t num_pages_; + + /// Total size of pages_, including any pages already deleted in 'delete_on_read' + /// mode. + int64_t total_byte_size_; + + /// True if there is currently an active read iterator for the stream. + bool has_read_iterator_; + + /// The current page being read. When no read iterator is active, equal to list.end(). + /// When a read iterator is active, either points to the current read page, or equals + /// list.end() if no rows have yet been read. GetNext() does not advance this past + /// the end of the stream, so upon eos 'read_page_' points to the last page and + /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an error + /// status was returned. + std::list<Page>::iterator read_page_; + + /// Saved reservation for read iterator. 'default_page_len_' reservation is saved if + /// there is a read iterator, no pinned read page, and the possibility that the read + /// iterator will advance to a valid page. + BufferPool::SubReservation read_page_reservation_; + + /// Number of rows returned from the current read_page_. + uint32_t read_page_rows_returned_; + + /// Pointer into read_page_ to the byte after the last row read. + uint8_t* read_ptr_; + + /// Pointer to one byte past the end of read_page_. Used to detect overruns. + const uint8_t* read_end_ptr_; + + /// Pointer into write_page_ to the byte after the last row written. + uint8_t* write_ptr_; + + /// Pointer to one byte past the end of write_page_. Cached to speed up computation + const uint8_t* write_end_ptr_; + + /// Number of rows returned to the caller from GetNext() since the last + /// PrepareForRead() call. + int64_t rows_returned_; + + /// True if there is currently an active write iterator into the stream. + bool has_write_iterator_; + + /// The current page for writing. NULL if there is no write iterator or no current + /// write page. Always pinned. Size is 'default_page_len_', except temporarily while + /// appending a larger row between AddRowCustomBegin() and AddRowCustomEnd(). + Page* write_page_; + + /// Saved reservation for write iterator. 'default_page_len_' reservation is saved if + /// there is a write iterator, no page currently pinned for writing and the possibility + /// that a pin count will be needed for the write iterator in future. Specifically if: + /// * no rows have been appended to the stream and 'pages_' is empty, or + /// * the stream is unpinned, 'write_page_' is null and and the last page in 'pages_' + /// is a large page that we advanced past, or + /// * there is only one pinned page in the stream and it is already pinned for reading. + BufferPool::SubReservation write_page_reservation_; + + /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list + /// to compute it. + int64_t bytes_pinned_; + + /// Number of rows stored in the stream. Includes rows that were already deleted during + /// a destructive 'delete_on_read' pass over the stream. + int64_t num_rows_; + + /// The default length in bytes of pages used to store the stream's rows. All rows that + /// fit in a default-sized page are stored in default-sized page. + const int64_t default_page_len_; + + /// The maximum length in bytes of pages used to store the stream's rows. This is a + /// hard limit on the maximum size of row that can be stored in the stream and the + /// amount of reservation required to read or write to an unpinned stream. + const int64_t max_page_len_; + + /// Whether any tuple in the rows is nullable. + const bool has_nullable_tuple_; + + /// If true, pages are deleted after they are read during this read pass. Once rows + /// have been read from a stream with 'delete_on_read_' true, this is always true. + bool delete_on_read_; + + bool closed_; // Used for debugging. + + /// If true, this stream has been explicitly pinned by the caller and all pages are + /// kept pinned until the caller calls UnpinStream(). + bool pinned_; + + bool is_read_page(const Page* page) const { + return read_page_ != pages_.end() && &*read_page_ == page; + } + + bool is_write_page(const Page* page) const { return write_page_ == page; } + + /// Return true if the read and write page are the same. + bool has_read_write_page() const { + return write_page_ != nullptr && is_read_page(write_page_); + } + + /// The slow path for AddRow() that is called if there is not sufficient space in + /// the current page. + bool AddRowSlow(TupleRow* row, Status* status) noexcept; + + /// The slow path for AddRowCustomBegin() that is called if there is not sufficient space in + /// the current page. + uint8_t* AddRowCustomBeginSlow(int64_t size, Status* status) noexcept; + + /// The slow path for AddRowCustomEnd() that is called for large pages. + void AddLargeRowCustomEnd(int64_t size) noexcept; + + /// Copies 'row' into the buffer starting at *data and ending at the byte before + /// 'data_end'. On success, returns true and updates *data to point after the last + /// byte written. Returns false if there is not enough space in the buffer provided. + bool DeepCopy(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; + + /// Templated implementation of DeepCopy(). + template <bool HAS_NULLABLE_TUPLE> + bool DeepCopyInternal(TupleRow* row, uint8_t** data, const uint8_t* data_end) noexcept; + + /// Helper function to copy strings in string_slots from tuple into *data. + /// Updates *data to the end of the string data added. Returns false if the data + /// does not fit in the buffer [*data, data_end). + static bool CopyStrings(const Tuple* tuple, + const std::vector<SlotDescriptor*>& string_slots, uint8_t** data, + const uint8_t* data_end); + + /// Helper function to deep copy collections in collection_slots from tuple into + /// the buffer [*data, data_end). Updates *data to the end of the collection data + /// added. Returns false if the data does not fit in the buffer. + static bool CopyCollections(const Tuple* tuple, + const std::vector<SlotDescriptor*>& collection_slots, uint8_t** data, + const uint8_t* data_end); + + /// Gets a new page of 'page_len' bytes from buffer_pool_, updating write_page_, + /// write_ptr_ and write_end_ptr_. The caller must ensure there is 'page_len' unused + /// reservation. The caller must reset the write page (if there is one) before calling. + Status NewWritePage(int64_t page_len) noexcept WARN_UNUSED_RESULT; + + /// Determines what page size is needed to fit a row of 'row_size' bytes. + /// Returns an error if the row cannot fit in a page. + Status CalcPageLenForRow(int64_t row_size, int64_t* page_len); + + /// Wrapper around NewWritePage() that allocates a new write page that fits a row of + /// 'row_size' bytes. Increases reservation if needed to allocate the next page. + /// Returns OK and sets 'got_reservation' to true if the write page was successfully + /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets + /// 'got_reservation' to false if the reservation could not be increased and no other + /// error was encountered. + Status AdvanceWritePage( + int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT; + + /// Reset the write page, if there is one, and unpin pages accordingly. If there + /// is an active write iterator, the next row will be appended to a new page. + void ResetWritePage(); + + /// Invalidate the write iterator and release any resources associated with it. After + /// calling this, no more rows can be appended to the stream. + void InvalidateWriteIterator(); + + /// Same as PrepareForRead(), except the iterators are not invalidated and + /// the caller is assumed to have checked there is sufficient unused reservation. + Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT; + + /// Pins the next read page. This blocks reading from disk if necessary to bring the + /// page's data into memory. Updates read_page_, read_ptr_, and + /// read_page_rows_returned_. + Status NextReadPage() WARN_UNUSED_RESULT; + + /// Invalidate the read iterator, and release any resources associated with the active + /// iterator. + void InvalidateReadIterator(); + + /// Returns the total additional bytes that this row will consume in write_page_ if + /// appended to the page. This includes the row's null indicators, the fixed length + /// part of the row and the data for inlined_string_slots_ and inlined_coll_slots_. + int64_t ComputeRowSize(TupleRow* row) const noexcept; + + /// Pins page and updates tracking stats. + Status PinPage(Page* page) WARN_UNUSED_RESULT; + + /// Increment the page's pin count if this page needs a higher pin count given the + /// current read and write iterator positions and whether the stream will be pinned + /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to + /// be incremented multiple times. The caller is responsible for ensuring sufficient + /// reservation is available. + Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT; + + /// Decrement the page's pin count if this page needs a lower pin count given the + /// current read and write iterator positions and whether the stream will be pinned + /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to + /// be decremented multiple times. + void UnpinPageIfNeeded(Page* page, bool stream_pinned); + + /// Return the expected pin count for 'page' in the current stream based on the current + /// read and write pages and whether the stream is pinned. + int ExpectedPinCount(bool stream_pinned, const Page* page) const; + + /// Return true if the stream in its current state needs to have a reservation for + /// a write page stored in 'write_page_reservation_'. + bool NeedWriteReservation() const; + + /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. + bool NeedWriteReservation(bool stream_pinned) const; + + /// Same as above, except assume the stream has 'num_pages' pages and different + /// iterator state. + static bool NeedWriteReservation(bool stream_pinned, int64_t num_pages, + bool has_write_iterator, bool has_write_page, bool has_read_write_page); + + /// Return true if the stream in its current state needs to have a reservation for + /// a read page stored in 'read_page_reservation_'. + bool NeedReadReservation() const; + + /// Same as above, except assume the stream's 'pinned_' state is 'stream_pinned'. + bool NeedReadReservation(bool stream_pinned) const; + + /// Same as above, except assume the stream has 'num_pages' pages and a different + /// read iterator state. + bool NeedReadReservation(bool stream_pinned, int64_t num_pages, bool has_read_iterator, + bool has_read_page) const; + + /// Same as above, except assume the stream has 'num_pages' pages and a different + /// write iterator state. + static bool NeedReadReservation(bool stream_pinned, int64_t num_pages, + bool has_read_iterator, bool has_read_page, bool has_write_iterator, + bool has_write_page); + + /// Templated GetNext implementations. + template <bool FILL_FLAT_ROWS> + Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); + template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> + Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); + + /// Helper function to convert a flattened TupleRow stored starting at '*data' into + /// 'row'. *data is updated to point to the first byte past the end of the row. + template <bool HAS_NULLABLE_TUPLE> + void UnflattenTupleRow(uint8_t** data, TupleRow* row) const; + + /// Helper function for GetNextInternal(). For each string slot in string_slots, + /// update StringValue's ptr field to point to the corresponding string data stored + /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the + /// StringValue's length field. + void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple); + + /// Helper function for GetNextInternal(). For each collection slot in collection_slots, + /// recursively update any pointers in the CollectionValue to point to the corresponding + /// var len data stored inline in the stream, advancing read_ptr_ as data is read. + /// Assumes that the collection was serialized to the stream in DeepCopy()'s format. + void FixUpCollectionsForRead( + const vector<SlotDescriptor*>& collection_slots, Tuple* tuple); + + /// Returns the number of null indicator bytes per row. Only valid if this stream has + /// nullable tuples. + int NullIndicatorBytesPerRow() const; + + /// Returns the total bytes pinned. Only called in DCHECKs to validate bytes_pinned_. + int64_t CalcBytesPinned() const; + + /// DCHECKs if the stream is internally inconsistent. The stream should always be in + /// a consistent state after returning success from a public API call. The Fast version + /// has constant runtime and does not check all of 'pages_'. The Full version includes + /// O(n) checks that require iterating over the whole 'pages_' list (e.g. checking that + /// each page is in a valid state). + void CheckConsistencyFast() const; + void CheckConsistencyFull() const; + void CheckPageConsistency(const Page* page) const; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/0c46147e/be/src/runtime/buffered-tuple-stream.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h new file mode 100644 index 0000000..2e1aad7 --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream.inline.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_INLINE_H +#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_INLINE_H + +#include "runtime/buffered-tuple-stream.h" + +#include "runtime/descriptors.h" +#include "runtime/tuple-row.h" +#include "util/bit-util.h" + +namespace impala { + +inline int BufferedTupleStream::NullIndicatorBytesPerRow() const { + DCHECK(has_nullable_tuple_); + return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size()); +} + +inline uint8_t* BufferedTupleStream::AddRowCustomBegin(int64_t size, Status* status) { + DCHECK(!closed_); + DCHECK(has_write_iterator()); + if (UNLIKELY(write_page_ == nullptr || write_ptr_ + size > write_end_ptr_)) { + return AddRowCustomBeginSlow(size, status); + } + DCHECK(write_page_ != nullptr); + DCHECK(write_page_->is_pinned()); + DCHECK_LE(write_ptr_ + size, write_end_ptr_); + ++num_rows_; + ++write_page_->num_rows; + + uint8_t* data = write_ptr_; + write_ptr_ += size; + return data; +} + +inline void BufferedTupleStream::AddRowCustomEnd(int64_t size) { + if (UNLIKELY(size > default_page_len_)) AddLargeRowCustomEnd(size); +} +} + +#endif
