http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc new file mode 100644 index 0000000..083b59e --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream-v2.cc @@ -0,0 +1,812 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/buffered-tuple-stream-v2.inline.h" + +#include <boost/bind.hpp> +#include <gutil/strings/substitute.h> + +#include "runtime/bufferpool/reservation-tracker.h" +#include "runtime/collection-value.h" +#include "runtime/descriptors.h" +#include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" +#include "runtime/row-batch.h" +#include "runtime/runtime-state.h" +#include "runtime/string-value.h" +#include "runtime/tuple-row.h" +#include "util/bit-util.h" +#include "util/debug-util.h" +#include "util/runtime-profile-counters.h" + +#include "common/names.h" + +#ifdef NDEBUG +#define CHECK_CONSISTENCY() +#else +#define CHECK_CONSISTENCY() CheckConsistency() +#endif + +using namespace impala; +using namespace strings; + +BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state, + const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client, + int64_t page_len, const set<SlotId>& ext_varlen_slots) + : state_(state), + desc_(row_desc), + buffer_pool_(state->exec_env()->buffer_pool()), + buffer_pool_client_(buffer_pool_client), + total_byte_size_(0), + read_page_rows_returned_(-1), + read_ptr_(nullptr), + write_ptr_(nullptr), + write_end_ptr_(nullptr), + rows_returned_(0), + write_page_(nullptr), + bytes_pinned_(0), + num_rows_(0), + page_len_(page_len), + has_nullable_tuple_(row_desc.IsAnyTupleNullable()), + delete_on_read_(false), + closed_(false), + pinned_(true) { + read_page_ = pages_.end(); + fixed_tuple_row_size_ = 0; + for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) { + const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i]; + const int tuple_byte_size = tuple_desc->byte_size(); + fixed_tuple_sizes_.push_back(tuple_byte_size); + fixed_tuple_row_size_ += tuple_byte_size; + + vector<SlotDescriptor*> tuple_string_slots; + vector<SlotDescriptor*> tuple_coll_slots; + for (int j = 0; j < tuple_desc->slots().size(); ++j) { + SlotDescriptor* slot = tuple_desc->slots()[j]; + if (!slot->type().IsVarLenType()) continue; + if (ext_varlen_slots.find(slot->id()) == ext_varlen_slots.end()) { + if (slot->type().IsVarLenStringType()) { + tuple_string_slots.push_back(slot); + } else { + DCHECK(slot->type().IsCollectionType()); + tuple_coll_slots.push_back(slot); + } + } + } + if (!tuple_string_slots.empty()) { + inlined_string_slots_.push_back(make_pair(i, tuple_string_slots)); + } + + if (!tuple_coll_slots.empty()) { + inlined_coll_slots_.push_back(make_pair(i, tuple_coll_slots)); + } + } + if (has_nullable_tuple_) fixed_tuple_row_size_ += NullIndicatorBytesPerRow(); +} + +BufferedTupleStreamV2::~BufferedTupleStreamV2() { + DCHECK(closed_); +} + +void BufferedTupleStreamV2::CheckConsistency() const { + DCHECK_EQ(bytes_pinned_, CalcBytesPinned()) << DebugString(); + for (const Page& page : pages_) { + DCHECK_EQ(ExpectedPinCount(pinned_, &page), page.pin_count()) << DebugString(); + } + if (has_write_iterator()) { + DCHECK(write_page_->is_pinned()); + DCHECK_GE(write_ptr_, write_page_->data()); + DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len()); + DCHECK_GE(write_end_ptr_, write_ptr_); + } + if (has_read_iterator()) { + DCHECK(read_page_->is_pinned()); + uint8_t* read_end_ptr = read_page_->data() + read_page_->len(); + DCHECK_GE(read_ptr_, read_page_->data()); + DCHECK_GE(read_end_ptr, read_ptr_); + } +} + +string BufferedTupleStreamV2::DebugString() const { + stringstream ss; + ss << "BufferedTupleStreamV2 num_rows=" << num_rows_ + << " rows_returned=" << rows_returned_ << " pinned=" << pinned_ + << " delete_on_read=" << delete_on_read_ << " closed=" << closed_ + << " bytes_pinned=" << bytes_pinned_ << " write_page=" << write_page_ + << " read_page="; + if (!has_read_iterator()) { + ss << "<end>"; + } else { + ss << &*read_page_; + } + ss << " pages=[\n"; + for (const Page& page : pages_) { + ss << "{" << page.DebugString() << "}"; + if (&page != &pages_.back()) ss << ",\n"; + } + ss << "]"; + return ss.str(); +} + +string BufferedTupleStreamV2::Page::DebugString() const { + return Substitute("$0 num_rows=$1", handle.DebugString(), num_rows); +} + +Status BufferedTupleStreamV2::Init(int node_id, bool pinned) { + if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT); + return Status::OK(); +} + +Status BufferedTupleStreamV2::PrepareForWrite(bool* got_reservation) { + // This must be the first iterator created. + DCHECK(pages_.empty()); + DCHECK(!delete_on_read_); + DCHECK(!has_write_iterator()); + DCHECK(!has_read_iterator()); + CHECK_CONSISTENCY(); + + RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_)); + *got_reservation = buffer_pool_client_->IncreaseReservationToFit(page_len_); + if (!*got_reservation) return Status::OK(); + RETURN_IF_ERROR(NewWritePage()); + CHECK_CONSISTENCY(); + return Status::OK(); +} + +Status BufferedTupleStreamV2::PrepareForReadWrite( + bool delete_on_read, bool* got_reservation) { + // This must be the first iterator created. + DCHECK(pages_.empty()); + DCHECK(!delete_on_read_); + DCHECK(!has_write_iterator()); + DCHECK(!has_read_iterator()); + CHECK_CONSISTENCY(); + + RETURN_IF_ERROR(CheckPageSizeForRow(fixed_tuple_row_size_)); + *got_reservation = buffer_pool_client_->IncreaseReservationToFit(2 * page_len_); + if (!*got_reservation) return Status::OK(); + RETURN_IF_ERROR(NewWritePage()); + RETURN_IF_ERROR(PrepareForReadInternal(delete_on_read)); + return Status::OK(); +} + +void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) { + for (Page& page : pages_) { + if (batch != nullptr && page.is_pinned()) { + BufferPool::BufferHandle buffer; + buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer); + batch->AddBuffer(buffer_pool_client_, move(buffer), flush); + } else { + buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle); + } + } + pages_.clear(); + bytes_pinned_ = 0; + closed_ = true; +} + +int64_t BufferedTupleStreamV2::CalcBytesPinned() const { + int64_t result = 0; + for (const Page& page : pages_) result += page.pin_count() * page.len(); + return result; +} + +Status BufferedTupleStreamV2::PinPage(Page* page) { + RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle)); + bytes_pinned_ += page->len(); + return Status::OK(); +} + +int BufferedTupleStreamV2::ExpectedPinCount(bool stream_pinned, const Page* page) const { + int pin_count = 0; + if (stream_pinned && has_write_iterator() && has_read_iterator()) { + // The stream is pinned, so all pages have a pin for that (and this pin will be used + // as the read iterator when the stream is unpinned) + pin_count++; + // The write iterator gets it's own pin so that we can unpin the stream without + // needing additional reservation. + if (is_write_page(page)) pin_count++; + } else if (stream_pinned) { + // The stream is pinned and only has one iterator. When it's unpinned, either the read + // or write iterator can use this pin count. + pin_count++; + } else { + // The stream is unpinned. Each iterator gets a pin count. + if (is_read_page(page)) pin_count++; + if (is_write_page(page)) pin_count++; + } + return pin_count; +} + +Status BufferedTupleStreamV2::PinPageIfNeeded(Page* page, bool stream_pinned) { + int new_pin_count = ExpectedPinCount(stream_pinned, page); + if (new_pin_count != page->pin_count()) { + DCHECK_EQ(new_pin_count, page->pin_count() + 1); + RETURN_IF_ERROR(PinPage(page)); + } + return Status::OK(); +} + +void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) { + int new_pin_count = ExpectedPinCount(stream_pinned, page); + if (new_pin_count != page->pin_count()) { + DCHECK_EQ(new_pin_count, page->pin_count() - 1); + buffer_pool_->Unpin(buffer_pool_client_, &page->handle); + bytes_pinned_ -= page->len(); + } +} + +Status BufferedTupleStreamV2::NewWritePage() noexcept { + DCHECK(!closed_); + DCHECK(!has_write_iterator()); + + Page new_page; + RETURN_IF_ERROR( + buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle)); + bytes_pinned_ += page_len_; + total_byte_size_ += page_len_; + + pages_.push_back(std::move(new_page)); + write_page_ = &pages_.back(); + DCHECK_EQ(write_page_->num_rows, 0); + write_ptr_ = write_page_->data(); + write_end_ptr_ = write_page_->data() + page_len_; + return Status::OK(); +} + +Status BufferedTupleStreamV2::CheckPageSizeForRow(int64_t row_size) { + // TODO: IMPALA-3208: need to rework this logic to support large pages - should pick + // next power-of-two size. + if (UNLIKELY(row_size > page_len_)) { + // TODO: IMPALA-3208: change the message to reference the query option controlling + // max row size. + return Status(TErrorCode::BTS_BLOCK_OVERFLOW, + PrettyPrinter::Print(row_size, TUnit::BYTES), + PrettyPrinter::Print(0, TUnit::BYTES)); + } + return Status::OK(); +} + +Status BufferedTupleStreamV2::AdvanceWritePage( + int64_t row_size, bool* got_reservation) noexcept { + CHECK_CONSISTENCY(); + + // Get ready to move to the next write page by unsetting 'write_page_' and + // potentially (depending on the mode of this stream) freeing up reservation for the + // next write page. + ResetWritePage(); + + RETURN_IF_ERROR(CheckPageSizeForRow(row_size)); + // May need to pin the new page for both reading and writing. See ExpectedPinCount(); + bool pin_for_read = has_read_iterator() && pinned_; + int64_t new_page_reservation = pin_for_read ? 2 * page_len_ : page_len_; + if (!buffer_pool_client_->IncreaseReservationToFit(new_page_reservation)) { + *got_reservation = false; + return Status::OK(); + } + RETURN_IF_ERROR(NewWritePage()); + // We may need to pin the page for reading also. + if (pin_for_read) RETURN_IF_ERROR(PinPage(write_page_)); + + CHECK_CONSISTENCY(); + *got_reservation = true; + return Status::OK(); +} + +void BufferedTupleStreamV2::ResetWritePage() { + if (!has_write_iterator()) return; + // Unpin the write page if we're reading in unpinned mode. + Page* prev_write_page = write_page_; + write_page_ = nullptr; + + // May need to decrement pin count now that it's not the write page, depending on + // the stream's mode. + UnpinPageIfNeeded(prev_write_page, pinned_); +} + +Status BufferedTupleStreamV2::NextReadPage() { + DCHECK(!closed_); + CHECK_CONSISTENCY(); + + if (delete_on_read_) { + DCHECK(read_page_ == pages_.begin()) << read_page_->DebugString() << " " << DebugString(); + DCHECK_NE(&*read_page_, write_page_); + bytes_pinned_ -= pages_.front().len(); + buffer_pool_->DestroyPage(buffer_pool_client_, &pages_.front().handle); + pages_.pop_front(); + read_page_ = pages_.begin(); + } else { + // Unpin pages after reading them if needed. + Page* prev_read_page = &*read_page_; + ++read_page_; + UnpinPageIfNeeded(prev_read_page, pinned_); + } + + if (!has_read_iterator()) { + CHECK_CONSISTENCY(); + return Status::OK(); + } + + // Ensure the next page is pinned for reading. If the stream is unpinned, we freed up + // enough reservation by deleting or unpinning the previous page. + // TODO: IMPALA-3208: this page may be larger than the previous, so this could + // actually fail once we have variable-length pages. + RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + + read_page_rows_returned_ = 0; + read_ptr_ = read_page_->data(); + + CHECK_CONSISTENCY(); + return Status::OK(); +} + +void BufferedTupleStreamV2::ResetReadPage() { + if (!has_read_iterator()) return; + // Unpin the write page if we're reading in unpinned mode. + Page* prev_read_page = &*read_page_; + read_page_ = pages_.end(); + + // May need to decrement pin count after destroying read iterator. + UnpinPageIfNeeded(prev_read_page, pinned_); +} + +Status BufferedTupleStreamV2::PrepareForRead(bool delete_on_read, bool* got_reservation) { + CHECK_CONSISTENCY(); + ResetWritePage(); + ResetReadPage(); + // If already pinned, no additional pin is needed (see ExpectedPinCount()). + *got_reservation = pinned_ || buffer_pool_client_->IncreaseReservationToFit(page_len_); + if (!*got_reservation) return Status::OK(); + return PrepareForReadInternal(delete_on_read); +} + +Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) { + DCHECK(!closed_); + DCHECK(!delete_on_read_); + DCHECK(!pages_.empty()); + DCHECK(!has_read_iterator()); + + // Check if we need to increment the pin count of the read page. + read_page_ = pages_.begin(); + RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_)); + + DCHECK(has_read_iterator()); + DCHECK(read_page_->is_pinned()); + read_page_rows_returned_ = 0; + read_ptr_ = read_page_->data(); + rows_returned_ = 0; + delete_on_read_ = delete_on_read; + CHECK_CONSISTENCY(); + return Status::OK(); +} + +Status BufferedTupleStreamV2::PinStream(bool* pinned) { + DCHECK(!closed_); + CHECK_CONSISTENCY(); + if (pinned_) { + *pinned = true; + return Status::OK(); + } + *pinned = false; + // First, make sure we have the reservation to pin all the pages for reading. + int64_t bytes_to_pin = 0; + for (Page& page : pages_) { + bytes_to_pin += (ExpectedPinCount(true, &page) - page.pin_count()) * page.len(); + } + bool reservation_granted = buffer_pool_client_->IncreaseReservationToFit(bytes_to_pin); + if (!reservation_granted) return Status::OK(); + + // At this point success is guaranteed - go through to pin the pages we need to pin. + for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true)); + + pinned_ = true; + *pinned = true; + CHECK_CONSISTENCY(); + return Status::OK(); +} + +void BufferedTupleStreamV2::UnpinStream(UnpinMode mode) { + DCHECK(!closed_); + if (mode == UNPIN_ALL) { + // Invalidate the iterators so they don't keep pages pinned. + ResetWritePage(); + ResetReadPage(); + } + + if (pinned_) { + // If the stream was pinned, there may be some remaining pinned pages that should + // be unpinned at this point. + for (Page& page : pages_) UnpinPageIfNeeded(&page, false); + pinned_ = false; + } + CHECK_CONSISTENCY(); +} + +Status BufferedTupleStreamV2::GetRows( + MemTracker* tracker, scoped_ptr<RowBatch>* batch, bool* got_rows) { + if (num_rows() > numeric_limits<int>::max()) { + // RowBatch::num_rows_ is a 32-bit int, avoid an overflow. + return Status(Substitute("Trying to read $0 rows into in-memory batch failed. Limit " + "is $1", + num_rows(), numeric_limits<int>::max())); + } + RETURN_IF_ERROR(PinStream(got_rows)); + if (!*got_rows) return Status::OK(); + bool got_reservation; + RETURN_IF_ERROR(PrepareForRead(false, &got_reservation)); + DCHECK(got_reservation) << "Stream was pinned"; + batch->reset(new RowBatch(desc_, num_rows(), tracker)); + bool eos = false; + // Loop until GetNext fills the entire batch. Each call can stop at page + // boundaries. We generally want it to stop, so that pages can be freed + // as we read. It is safe in this case because we pin the entire stream. + while (!eos) { + RETURN_IF_ERROR(GetNext(batch->get(), &eos)); + } + return Status::OK(); +} + +Status BufferedTupleStreamV2::GetNext(RowBatch* batch, bool* eos) { + return GetNextInternal<false>(batch, eos, nullptr); +} + +Status BufferedTupleStreamV2::GetNext( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + return GetNextInternal<true>(batch, eos, flat_rows); +} + +template <bool FILL_FLAT_ROWS> +Status BufferedTupleStreamV2::GetNextInternal( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + if (has_nullable_tuple_) { + return GetNextInternal<FILL_FLAT_ROWS, true>(batch, eos, flat_rows); + } else { + return GetNextInternal<FILL_FLAT_ROWS, false>(batch, eos, flat_rows); + } +} + +template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> +Status BufferedTupleStreamV2::GetNextInternal( + RowBatch* batch, bool* eos, vector<FlatRowPtr>* flat_rows) { + DCHECK(!closed_); + DCHECK(batch->row_desc().Equals(desc_)); + DCHECK(is_pinned() || !FILL_FLAT_ROWS) + << "FlatRowPtrs are only valid for pinned streams"; + *eos = (rows_returned_ == num_rows_); + if (*eos) return Status::OK(); + + if (UNLIKELY(read_page_rows_returned_ == read_page_->num_rows)) { + // Get the next page in the stream. We need to do this at the beginning of the + // GetNext() call to ensure the buffer management semantics. NextReadPage() may + // unpin or delete the buffer backing the rows returned from the *previous* call + // to GetNext(). + RETURN_IF_ERROR(NextReadPage()); + } + + DCHECK(has_read_iterator()); + DCHECK(read_page_->is_pinned()) << DebugString(); + DCHECK_GE(read_page_rows_returned_, 0); + + int rows_left_in_page = read_page_->num_rows - read_page_rows_returned_; + int rows_to_fill = std::min(batch->capacity() - batch->num_rows(), rows_left_in_page); + DCHECK_GE(rows_to_fill, 1); + uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(batch->GetRow(batch->num_rows())); + + // Produce tuple rows from the current page and the corresponding position on the + // null tuple indicator. + if (FILL_FLAT_ROWS) { + DCHECK(flat_rows != nullptr); + DCHECK(!delete_on_read_); + DCHECK_EQ(batch->num_rows(), 0); + flat_rows->clear(); + flat_rows->reserve(rows_to_fill); + } + + const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + // Start reading from the current position in 'read_page_'. + for (int i = 0; i < rows_to_fill; ++i) { + if (FILL_FLAT_ROWS) { + flat_rows->push_back(read_ptr_); + DCHECK_EQ(flat_rows->size(), i + 1); + } + // Copy the row into the output batch. + TupleRow* output_row = reinterpret_cast<TupleRow*>(tuple_row_mem); + tuple_row_mem += sizeof(Tuple*) * tuples_per_row; + UnflattenTupleRow<HAS_NULLABLE_TUPLE>(&read_ptr_, output_row); + + // Update string slot ptrs, skipping external strings. + for (int j = 0; j < inlined_string_slots_.size(); ++j) { + Tuple* tuple = output_row->GetTuple(inlined_string_slots_[j].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + FixUpStringsForRead(inlined_string_slots_[j].second, tuple); + } + + // Update collection slot ptrs, skipping external collections. We traverse the + // collection structure in the same order as it was written to the stream, allowing + // us to infer the data layout based on the length of collections and strings. + for (int j = 0; j < inlined_coll_slots_.size(); ++j) { + Tuple* tuple = output_row->GetTuple(inlined_coll_slots_[j].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + FixUpCollectionsForRead(inlined_coll_slots_[j].second, tuple); + } + } + + batch->CommitRows(rows_to_fill); + rows_returned_ += rows_to_fill; + read_page_rows_returned_ += rows_to_fill; + *eos = (rows_returned_ == num_rows_); + if (read_page_rows_returned_ == read_page_->num_rows && (!pinned_ || delete_on_read_)) { + // No more data in this page. The batch must be immediately returned up the operator + // tree and deep copied so that NextReadPage() can reuse the read page's buffer. + // TODO: IMPALA-4179 - instead attach the buffer and flush the resources. + batch->MarkNeedsDeepCopy(); + } + if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill); + DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len()); + return Status::OK(); +} + +void BufferedTupleStreamV2::FixUpStringsForRead( + const vector<SlotDescriptor*>& string_slots, Tuple* tuple) { + DCHECK(tuple != nullptr); + for (const SlotDescriptor* slot_desc : string_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + + StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); + DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len()); + sv->ptr = reinterpret_cast<char*>(read_ptr_); + read_ptr_ += sv->len; + } +} + +void BufferedTupleStreamV2::FixUpCollectionsForRead( + const vector<SlotDescriptor*>& collection_slots, Tuple* tuple) { + DCHECK(tuple != nullptr); + for (const SlotDescriptor* slot_desc : collection_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + + CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); + const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); + int coll_byte_size = cv->num_tuples * item_desc.byte_size(); + DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len()); + cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_); + read_ptr_ += coll_byte_size; + + if (!item_desc.HasVarlenSlots()) continue; + uint8_t* coll_data = cv->ptr; + for (int i = 0; i < cv->num_tuples; ++i) { + Tuple* item = reinterpret_cast<Tuple*>(coll_data); + FixUpStringsForRead(item_desc.string_slots(), item); + FixUpCollectionsForRead(item_desc.collection_slots(), item); + coll_data += item_desc.byte_size(); + } + } +} + +int64_t BufferedTupleStreamV2::ComputeRowSize(TupleRow* row) const noexcept { + int64_t size = 0; + if (has_nullable_tuple_) { + size += NullIndicatorBytesPerRow(); + for (int i = 0; i < fixed_tuple_sizes_.size(); ++i) { + if (row->GetTuple(i) != nullptr) size += fixed_tuple_sizes_[i]; + } + } else { + size = fixed_tuple_row_size_; + } + for (int i = 0; i < inlined_string_slots_.size(); ++i) { + Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); + if (tuple == nullptr) continue; + const vector<SlotDescriptor*>& slots = inlined_string_slots_[i].second; + for (auto it = slots.begin(); it != slots.end(); ++it) { + if (tuple->IsNull((*it)->null_indicator_offset())) continue; + size += tuple->GetStringSlot((*it)->tuple_offset())->len; + } + } + + for (int i = 0; i < inlined_coll_slots_.size(); ++i) { + Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); + if (tuple == nullptr) continue; + const vector<SlotDescriptor*>& slots = inlined_coll_slots_[i].second; + for (auto it = slots.begin(); it != slots.end(); ++it) { + if (tuple->IsNull((*it)->null_indicator_offset())) continue; + CollectionValue* cv = tuple->GetCollectionSlot((*it)->tuple_offset()); + const TupleDescriptor& item_desc = *(*it)->collection_item_descriptor(); + size += cv->num_tuples * item_desc.byte_size(); + + if (!item_desc.HasVarlenSlots()) continue; + for (int j = 0; j < cv->num_tuples; ++j) { + Tuple* item = reinterpret_cast<Tuple*>(&cv->ptr[j * item_desc.byte_size()]); + size += item->VarlenByteSize(item_desc); + } + } + } + return size; +} + +bool BufferedTupleStreamV2::AddRowSlow(TupleRow* row, Status* status) noexcept { + bool got_reservation; + *status = AdvanceWritePage(ComputeRowSize(row), &got_reservation); + if (!status->ok() || !got_reservation) return false; + return DeepCopy(row); +} + +uint8_t* BufferedTupleStreamV2::AllocateRowSlow( + int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept { + int64_t row_size = static_cast<int64_t>(fixed_size) + varlen_size; + bool got_reservation; + *status = AdvanceWritePage(row_size, &got_reservation); + if (!status->ok() || !got_reservation) return nullptr; + + // We have a large-enough page so now success is guaranteed. + uint8_t* result = AllocateRow(fixed_size, varlen_size, varlen_data, status); + DCHECK(result != nullptr); + return result; +} + +bool BufferedTupleStreamV2::DeepCopy(TupleRow* row) noexcept { + if (has_nullable_tuple_) { + return DeepCopyInternal<true>(row); + } else { + return DeepCopyInternal<false>(row); + } +} + +// TODO: consider codegening this. +// TODO: in case of duplicate tuples, this can redundantly serialize data. +template <bool HAS_NULLABLE_TUPLE> +bool BufferedTupleStreamV2::DeepCopyInternal(TupleRow* row) noexcept { + if (UNLIKELY(write_page_ == nullptr)) return false; + DCHECK(write_page_->is_pinned()) << DebugString() << std::endl + << write_page_->DebugString(); + + const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + uint32_t bytes_remaining = write_end_ptr_ - write_ptr_; + + // Move to the next page we may not have enough space to append the fixed-length part + // of the row. + if (UNLIKELY((bytes_remaining < fixed_tuple_row_size_))) return false; + + // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple + // indicator. + if (HAS_NULLABLE_TUPLE) { + uint8_t* null_indicators = write_ptr_; + int null_indicator_bytes = NullIndicatorBytesPerRow(); + memset(null_indicators, 0, null_indicator_bytes); + write_ptr_ += NullIndicatorBytesPerRow(); + for (int i = 0; i < tuples_per_row; ++i) { + uint8_t* null_word = null_indicators + (i >> 3); + const uint32_t null_pos = i & 7; + const int tuple_size = fixed_tuple_sizes_[i]; + Tuple* t = row->GetTuple(i); + const uint8_t mask = 1 << (7 - null_pos); + if (t != nullptr) { + memcpy(write_ptr_, t, tuple_size); + write_ptr_ += tuple_size; + } else { + *null_word |= mask; + } + } + } else { + // If we know that there are no nullable tuples no need to set the nullability flags. + for (int i = 0; i < tuples_per_row; ++i) { + const int tuple_size = fixed_tuple_sizes_[i]; + Tuple* t = row->GetTuple(i); + // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) + // is delivered, the check below should become DCHECK(t != nullptr). + DCHECK(t != nullptr || tuple_size == 0); + memcpy(write_ptr_, t, tuple_size); + write_ptr_ += tuple_size; + } + } + + // Copy inlined string slots. Note: we do not need to convert the string ptrs to offsets + // on the write path, only on the read. The tuple data is immediately followed + // by the string data so only the len information is necessary. + for (int i = 0; i < inlined_string_slots_.size(); ++i) { + const Tuple* tuple = row->GetTuple(inlined_string_slots_[i].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + if (UNLIKELY(!CopyStrings(tuple, inlined_string_slots_[i].second))) return false; + } + + // Copy inlined collection slots. We copy collection data in a well-defined order so + // we do not need to convert pointers to offsets on the write path. + for (int i = 0; i < inlined_coll_slots_.size(); ++i) { + const Tuple* tuple = row->GetTuple(inlined_coll_slots_[i].first); + if (HAS_NULLABLE_TUPLE && tuple == nullptr) continue; + if (UNLIKELY(!CopyCollections(tuple, inlined_coll_slots_[i].second))) return false; + } + + ++num_rows_; + ++write_page_->num_rows; + return true; +} + +bool BufferedTupleStreamV2::CopyStrings( + const Tuple* tuple, const vector<SlotDescriptor*>& string_slots) { + for (const SlotDescriptor* slot_desc : string_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + const StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); + if (LIKELY(sv->len > 0)) { + if (UNLIKELY(write_ptr_ + sv->len > write_end_ptr_)) return false; + + memcpy(write_ptr_, sv->ptr, sv->len); + write_ptr_ += sv->len; + } + } + return true; +} + +bool BufferedTupleStreamV2::CopyCollections( + const Tuple* tuple, const vector<SlotDescriptor*>& collection_slots) { + for (const SlotDescriptor* slot_desc : collection_slots) { + if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; + const CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset()); + const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor(); + if (LIKELY(cv->num_tuples > 0)) { + int coll_byte_size = cv->num_tuples * item_desc.byte_size(); + if (UNLIKELY(write_ptr_ + coll_byte_size > write_end_ptr_)) return false; + uint8_t* coll_data = write_ptr_; + memcpy(coll_data, cv->ptr, coll_byte_size); + write_ptr_ += coll_byte_size; + + if (!item_desc.HasVarlenSlots()) continue; + // Copy variable length data when present in collection items. + for (int i = 0; i < cv->num_tuples; ++i) { + const Tuple* item = reinterpret_cast<Tuple*>(coll_data); + if (UNLIKELY(!CopyStrings(item, item_desc.string_slots()))) return false; + if (UNLIKELY(!CopyCollections(item, item_desc.collection_slots()))) return false; + coll_data += item_desc.byte_size(); + } + } + } + return true; +} + +void BufferedTupleStreamV2::GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const { + DCHECK(row != nullptr); + DCHECK(!closed_); + DCHECK(is_pinned()); + DCHECK(!delete_on_read_); + uint8_t* data = flat_row; + return has_nullable_tuple_ ? UnflattenTupleRow<true>(&data, row) : + UnflattenTupleRow<false>(&data, row); +} + +template <bool HAS_NULLABLE_TUPLE> +void BufferedTupleStreamV2::UnflattenTupleRow(uint8_t** data, TupleRow* row) const { + const int tuples_per_row = desc_.tuple_descriptors().size(); + uint8_t* ptr = *data; + if (has_nullable_tuple_) { + // Stitch together the tuples from the page and the NULL ones. + const uint8_t* null_indicators = ptr; + ptr += NullIndicatorBytesPerRow(); + for (int i = 0; i < tuples_per_row; ++i) { + const uint8_t* null_word = null_indicators + (i >> 3); + const uint32_t null_pos = i & 7; + const bool is_not_null = ((*null_word & (1 << (7 - null_pos))) == 0); + row->SetTuple( + i, reinterpret_cast<Tuple*>(reinterpret_cast<uint64_t>(ptr) * is_not_null)); + ptr += fixed_tuple_sizes_[i] * is_not_null; + } + } else { + for (int i = 0; i < tuples_per_row; ++i) { + row->SetTuple(i, reinterpret_cast<Tuple*>(ptr)); + ptr += fixed_tuple_sizes_[i]; + } + } + *data = ptr; +}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h new file mode 100644 index 0000000..d707604 --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream-v2.h @@ -0,0 +1,592 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H +#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_H + +#include <set> +#include <vector> +#include <boost/scoped_ptr.hpp> + +#include "common/global-types.h" +#include "common/status.h" +#include "gutil/macros.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/row-batch.h" + +namespace impala { + +class MemTracker; +class RuntimeState; +class RowDescriptor; +class SlotDescriptor; +class Tuple; +class TupleRow; + +/// Class that provides an abstraction for a stream of tuple rows backed by BufferPool +/// Pages. Rows can be added to the stream and read back. Rows are returned in the order +/// they are added. +/// +/// The BufferedTupleStream is *not* thread safe from the caller's point of view. +/// Different threads should not concurrently call methods of the same BufferedTupleStream +/// object. +/// +/// Reading and writing the stream: +/// The stream supports two modes of reading/writing, depending on whether +/// PrepareForWrite() is called to initialize a write iterator only or +/// PrepareForReadWrite() is called to initialize both read and write iterators to enable +/// interleaved reads and writes. +/// +/// To use write-only mode, PrepareForWrite() is called once and AddRow()/AllocateRow() +/// are called repeatedly to initialize then advance a write iterator through the stream. +/// Once the stream is fully written, it can be read back by calling PrepareForRead() +/// then GetNext() repeatedly to advance a read iterator through the stream, or by +/// calling GetRows() to get all of the rows at once. +/// +/// To use read/write mode, PrepareForReadWrite() is called once to initialize the read +/// and write iterators. AddRow()/AllocateRow() then advance a write iterator through the +/// stream, and GetNext() advances a trailing read iterator through the stream. +/// +/// Buffer management: +/// The tuple stream is backed by a sequence of BufferPool Pages. The tuple stream uses +/// the client's reservation to pin pages in memory. It will automatically try to +/// increase the client's reservation whenever it needs to do so to make progress. +/// +/// The stream has both pinned and unpinned modes. In the pinned mode all pages are +/// pinned for reading. The pinned mode avoids I/O by keeping all pages pinned in memory +/// and allows clients to save pointers to rows in the stream and randomly access them. +/// E.g. hash tables can be backed by a BufferedTupleStream. In the unpinned mode, only +/// pages currently being read and written are pinned and other pages are unpinned and +/// therefore do not use the client's reservation and can be spilled to disk. +/// +/// When the stream is in read/write mode, the stream always uses one buffer's worth +/// of reservation of writing and at least one buffer's worth of reservation for reading, +/// even if the same page is currently being read and written. This means that +/// UnpinStream() always succeeds, and moving to the next write page or read page on an +/// unpinned stream does not require additional reservation. +/// TODO: IMPALA-3208: variable-length pages will add a caveat here. +/// +/// The tuple stream also supports a 'delete_on_read' mode, enabled by passing a flag +/// to PrepareForRead() which deletes the stream's pages as it does a final read +/// pass over the stream. +/// +/// TODO: IMPALA-4179: the buffer management can be simplified once we can attach +/// buffers to RowBatches. +/// +/// Page layout: +/// Rows are stored back to back starting at the first byte of each page's buffer, with +/// no interleaving of data from different rows. There is no padding or alignment +/// between rows. +/// +/// Tuple row layout: +/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a +/// bitstring at the start of each row with null indicators for all tuples in each row +/// (including non-nullable tuples). The bitstring occupies ceil(num_tuples_per_row / 8) +/// bytes. A 1 indicates the tuple is null. +/// +/// The fixed length parts of the row's tuples are stored first, followed by var len data +/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can +/// point to var len data outside the stream. When reading the stream, the length of each +/// row's var len data in the stream must be computed to find the next row's start. +/// +/// The tuple stream supports reading from the stream into RowBatches without copying +/// out any data: the RowBatches' Tuple pointers will point directly into the stream's +/// pages' buffers. The fixed length parts follow Impala's internal tuple format, so for +/// the tuple to be valid, we only need to update pointers to point to the var len data +/// in the stream. These pointers need to be updated by the stream because a spilled +/// page's data may be relocated to a different buffer. The pointers are updated lazily +/// upon reading the stream via GetNext() or GetRows(). +/// +/// Example layout for a row with two non-nullable tuples ((1, "hello"), (2, "world")) +/// with all var len data stored in the stream: +/// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ... +/// +--------+-----------+-----------+-----------+-------------+ +/// | IntVal | StringVal | BigIntVal | StringVal | | ... +/// +--------+-----------+-----------+-----------++------------+ +/// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ... +/// | | ptr: 0x.. | | ptr: 0x.. | | ... +/// +--------+-----------+-----------+-----------+-------------+ +/// <--4b--> <---12b---> <----8b---> <---12b---> <----10b----> +/// +/// Example layout for a row with the second tuple nullable ((1, "hello"), NULL) +/// with all var len data stored in the stream: +/// <- null tuple bitstring -> <---- tuple 1 -----> <- var len -> <- next row ... +/// +-------------------------+--------+-----------+------------+ +/// | | IntVal | StringVal | | ... +/// +-------------------------+--------+-----------+------------+ +/// | 0000 0010 | val: 1 | len: 5 | hello | ... +/// | | | ptr: 0x.. | | ... +/// +-------------------------+--------+-----------+------------+ +/// <---------1b------------> <--4b--> <---12b---> <----5b----> +/// +/// Example layout for a row with a single non-nullable tuple (("hello", "world")) with +/// the second string slot stored externally to the stream: +/// <------ tuple 1 ------> <- var len -> <- next row ... +/// +-----------+-----------+-------------+ +/// | StringVal | StringVal | | ... +/// +-----------+-----------+-------------+ +/// | len: 5 | len: 5 | hello | ... +/// | ptr: 0x.. | ptr: 0x.. | | ... +/// +-----------+-----------+-------------+ +/// <---12b---> <---12b---> <-----5b----> +/// +/// The behavior of reads and writes is as follows: +/// Read: +/// 1. Unpinned: Only a single read page is pinned at a time. This means that only +/// enough reservation to pin a single page is needed to read the stream, regardless +/// of the stream's size. Each page is deleted or unpinned (if delete on read is true +/// or false respectively) before advancing to the next page. +/// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or +/// unpinned when reading from the stream. If delete on read is true, pages are +/// deleted after being read. +/// Write: +/// 1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation +/// to pin a single write page is required to write to the stream, regardless of the +/// stream's size. +/// 2. Pinned: Pages are left pinned. If the next page in the stream cannot be pinned +/// because the caller's reservation is insufficient (and could not be increased by +/// the stream), the read call will fail and the caller can either unpin the stream +/// or free up other memory before retrying. +/// +/// Memory lifetime of rows read from stream: +/// If the stream is pinned and delete on read is false, it is valid to access any tuples +/// returned via GetNext() or GetRows() until the stream is unpinned. If the stream is +/// unpinned or delete on read is true, then the batch returned from GetNext() may have +/// the needs_deep_copy flag set, which means that any tuple memory returned so far from +/// the stream may be freed on the next call to GetNext(). +/// TODO: IMPALA-4179, instead of needs_deep_copy, attach the pages' buffers to the batch. +/// +/// Manual construction of rows with AllocateRow(): +/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow(). +/// AllocateRow() is called instead of AddRow() if the caller wants to manually construct +/// a row. The caller of AllocateRow() is responsible for writing the row with exactly the +/// layout described above. +/// +/// If a caller constructs a tuple in this way, the caller can set the pointers and they +/// will not be modified until the stream is read via GetNext() or GetRows(). +/// TODO: IMPALA-5007: try to remove AllocateRow() by unifying with AddRow(). +/// +/// TODO: we need to be able to do read ahead for pages. We need some way to indicate a +/// page will need to be pinned soon. +class BufferedTupleStreamV2 { + public: + /// A pointer to the start of a flattened TupleRow in the stream. + typedef uint8_t* FlatRowPtr; + + /// row_desc: description of rows stored in the stream. This is the desc for rows + /// that are added and the rows being returned. + /// page_len: the size of pages to use in the stream + /// TODO:IMPALA-3208: support a default and maximum page length + /// ext_varlen_slots: set of varlen slots with data stored externally to the stream + BufferedTupleStreamV2(RuntimeState* state, const RowDescriptor& row_desc, + BufferPool::ClientHandle* buffer_pool_client, int64_t page_len, + const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); + + virtual ~BufferedTupleStreamV2(); + + /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called + /// once before any of the other APIs. + /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned. + /// 'node_id' is only used for error reporting. + Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT; + + /// Prepares the stream for writing by attempting to allocate a write buffer. Tries to + /// increase reservation if there is not enough unused reservation for the buffer. + /// Called after Init() and before the first AddRow() or AllocateRow() call. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// first write page and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForWrite(bool* got_reservation) WARN_UNUSED_RESULT; + + /// Prepares the stream for interleaved reads and writes by allocating read and write + /// buffers. Called after Init() and before the first AddRow() or AllocateRow() call. + /// delete_on_read: Pages are deleted after they are read. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// read and write pages and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForReadWrite( + bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + + /// Prepares the stream for reading, invalidating the write iterator (if there is one). + /// Therefore must be called after the last AddRow() or AllocateRow() and before + /// GetNext(). PrepareForRead() can be called multiple times to do multiple read passes + /// over the stream, unless PrepareForRead() or PrepareForReadWrite() was previously + /// called with delete_on_read = true. + /// delete_on_read: Pages are deleted after they are read. + /// 'got_reservation': set to true if there was enough reservation to initialize the + /// first read page and false if there was not enough reservation and no other + /// error was encountered. Undefined if an error status is returned. + Status PrepareForRead(bool delete_on_read, bool* got_reservation) WARN_UNUSED_RESULT; + + /// Adds a single row to the stream. There are three possible outcomes: + /// a) The append succeeds. True is returned. + /// b) The append fails because the unused reservation was not sufficient to add + /// a new page to the stream and the stream could not increase the reservation + /// sufficiently. Returns false and sets 'status' to OK. The append can be retried + /// after freeing up memory or unpinning the stream. + /// c) The append fails with a runtime error. Returns false and sets 'status' to an + /// error. + /// d) The append fails becase the row is too large to fit in a page of a stream. + /// Returns false and sets 'status' to an error. + /// + /// Unpinned streams avoid case b) because memory is automatically freed up by + /// unpinning the current write page. + /// TODO: IMPALA-3808: update to reflect behaviour with variable-length pages + /// + /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() + /// returns an error, it should not be called again. + bool AddRow(TupleRow* row, Status* status) noexcept WARN_UNUSED_RESULT; + + /// Allocates space to store a row of with fixed length 'fixed_size' and variable + /// length data 'varlen_size'. If successful, returns the pointer where fixed length + /// data should be stored and assigns 'varlen_data' to where var-len data should + /// be stored. AllocateRow does not currently support nullable tuples. + /// + /// The meaning of the return values are the same as AddRow(), except failure is + /// indicated by returning NULL instead of false. + uint8_t* AllocateRow( + int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status); + + /// Unflattens 'flat_row' into a regular TupleRow 'row'. Only valid to call if the + /// stream is pinned. The row must have been allocated with the stream's row desc. + /// The returned 'row' is backed by memory from the stream so is only valid as long + /// as the stream is pinned. + void GetTupleRow(FlatRowPtr flat_row, TupleRow* row) const; + + /// Pins all pages in this stream and switches to pinned mode. Has no effect if the + /// stream is already pinned. + /// If the current unused reservation is not sufficient to pin the stream in memory, + /// this will try to increase the reservation. If that fails, 'pinned' is set to false + /// and the stream is left unpinned. Otherwise 'pinned' is set to true. + Status PinStream(bool* pinned) WARN_UNUSED_RESULT; + + /// Modes for UnpinStream(). + enum UnpinMode { + /// All pages in the stream are unpinned and the read/write positions in the stream + /// are reset. No more rows can be written to the stream after this. The stream can + /// be re-read from the beginning by calling PrepareForRead(). + UNPIN_ALL, + /// All pages are unpinned aside from the current read and write pages (if any), + /// which is left in the same state. The unpinned stream can continue being read + /// or written from the current read or write positions. + UNPIN_ALL_EXCEPT_CURRENT, + }; + + /// Unpins stream with the given 'mode' as described above. + void UnpinStream(UnpinMode mode); + + /// Get the next batch of output rows, which are backed by the stream's memory. + /// If the stream is unpinned or 'delete_on_read' is true, the 'needs_deep_copy' + /// flag may be set on 'batch' to signal that memory will be freed on the next + /// call to GetNext() and that the caller should copy out any data it needs from + /// rows in 'batch' or in previous batches returned from GetNext(). + /// + /// If the stream is pinned and 'delete_on_read' is false, the memory backing the + /// rows will remain valid until the stream is unpinned, destroyed, etc. + /// TODO: IMPALA-4179: update when we simplify the memory transfer model. + Status GetNext(RowBatch* batch, bool* eos) WARN_UNUSED_RESULT; + + /// Same as above, but populate 'flat_rows' with a pointer to the flat version of + /// each returned row in the pinned stream. The pointers in 'flat_rows' are only + /// valid as long as the stream remains pinned. + Status GetNext( + RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows) WARN_UNUSED_RESULT; + + /// Returns all the rows in the stream in batch. This pins the entire stream in the + /// process. If the current unused reservation is not sufficient to pin the stream in + /// memory, this will try to increase the reservation. If that fails, 'got_rows' is set + /// to false. + Status GetRows(MemTracker* tracker, boost::scoped_ptr<RowBatch>* batch, + bool* got_rows) WARN_UNUSED_RESULT; + + /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, + /// attaches buffers from any pinned pages to the batch and deletes unpinned + /// pages. Otherwise deletes all pages. Does nothing if the stream was already + /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching + /// buffers. + void Close(RowBatch* batch, RowBatch::FlushMode flush); + + /// Number of rows in the stream. + int64_t num_rows() const { return num_rows_; } + + /// Number of rows returned via GetNext(). + int64_t rows_returned() const { return rows_returned_; } + + /// Returns the byte size necessary to store the entire stream in memory. + int64_t byte_size() const { return total_byte_size_; } + + /// Returns the number of bytes currently pinned in memory by the stream. + /// If ignore_current is true, the write_page_ memory is not included. + int64_t BytesPinned(bool ignore_current) const { + if (ignore_current && write_page_ != nullptr && write_page_->is_pinned()) { + return bytes_pinned_ - write_page_->len(); + } + return bytes_pinned_; + } + + bool is_closed() const { return closed_; } + bool is_pinned() const { return pinned_; } + bool has_read_iterator() const { return read_page_ != pages_.end(); } + bool has_write_iterator() const { return write_page_ != nullptr; } + + std::string DebugString() const; + + private: + DISALLOW_COPY_AND_ASSIGN(BufferedTupleStreamV2); + friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; + friend class ArrayTupleStreamTest_TestComputeRowSize_Test; + friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; + friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; + + /// Wrapper around BufferPool::PageHandle that tracks additional info about the page. + struct Page { + Page() : num_rows(0) {} + + inline int len() const { return handle.len(); } + inline uint8_t* data() const { return handle.data(); } + inline bool is_pinned() const { return handle.is_pinned(); } + inline int pin_count() const { return handle.pin_count(); } + std::string DebugString() const; + + BufferPool::PageHandle handle; + + /// Number of rows written to the page. + int num_rows; + }; + + /// Runtime state instance used to check for cancellation. Not owned. + RuntimeState* const state_; + + /// Description of rows stored in the stream. + const RowDescriptor& desc_; + + /// Sum of the fixed length portion of all the tuples in desc_, including any null + /// indicators. + int fixed_tuple_row_size_; + + /// The size of the fixed length portion for each tuple in the row. + std::vector<int> fixed_tuple_sizes_; + + /// Vectors of all the strings slots that have their varlen data stored in stream + /// grouped by tuple_idx. + std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_; + + /// Vectors of all the collection slots that have their varlen data stored in the + /// stream, grouped by tuple_idx. + std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_; + + /// Buffer pool and client used to allocate, pin and release pages. Not owned. + BufferPool* buffer_pool_; + BufferPool::ClientHandle* buffer_pool_client_; + + /// List of pages in the stream. + /// Empty before PrepareForWrite() is called or after the stream has been destructively + /// read in 'delete_on_read' mode. Non-empty otherwise. + std::list<Page> pages_; + + /// Total size of pages_, including any pages already deleted in 'delete_on_read' + /// mode. + int64_t total_byte_size_; + + /// Iterator pointing to the current page for reading. Equal to list.end() when no + /// read iterator is active. GetNext() does not advance this past the end of + /// the stream, so upon eos 'read_page_' points to the last page and + /// rows_returned_ == num_rows_. Always pinned, unless a Pin() call failed and an + /// error status was returned. + std::list<Page>::iterator read_page_; + + /// Number of rows returned from the current read_page_. + uint32_t read_page_rows_returned_; + + /// Pointer into read_page_ to the byte after the last row read. + uint8_t* read_ptr_; + + /// Pointer into write_page_ to the byte after the last row written. + uint8_t* write_ptr_; + + /// Pointer to one byte past the end of write_page_. Cached to speed up computation + uint8_t* write_end_ptr_; + + /// Number of rows returned to the caller from GetNext() since the last + /// PrepareForRead() call. + int64_t rows_returned_; + + /// The current page for writing. NULL if there is no available page to write to. + /// Always pinned. If 'read_page_' and 'write_page_' reference the same page, then + /// that page is only pinned once. + Page* write_page_; + + /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list + /// to compute it. + int64_t bytes_pinned_; + + /// Number of rows stored in the stream. Includes rows that were already deleted during + /// a destructive 'delete_on_read' pass over the stream. + int64_t num_rows_; + + /// The length in bytes of pages used to store the stream's rows. + /// TODO: IMPALA-3808: support variable-length pages + const int64_t page_len_; + + /// Whether any tuple in the rows is nullable. + const bool has_nullable_tuple_; + + /// If true, pages are deleted after they are read. + bool delete_on_read_; + + bool closed_; // Used for debugging. + + /// If true, this stream has been explicitly pinned by the caller and all pages are + /// kept pinned until the caller calls UnpinStream(). + bool pinned_; + + bool is_read_page(const Page* page) const { + return has_read_iterator() && &*read_page_ == page; + } + + bool is_write_page(const Page* page) const { return write_page_ == page; } + + /// The slow path for AddRow() that is called if there is not sufficient space in + /// the current page. + bool AddRowSlow(TupleRow* row, Status* status) noexcept; + + /// The slow path for AllocateRow() that is called if there is not sufficient space in + /// the current page. + uint8_t* AllocateRowSlow( + int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) noexcept; + + /// Copies 'row' into write_page_. Returns false if there is not enough space in + /// 'write_page_'. After returning false, write_ptr_ may be left pointing to the + /// partially-written row, and no more data can be written to write_page_. + template <bool HAS_NULLABLE_TUPLE> + bool DeepCopyInternal(TupleRow* row) noexcept; + + /// Helper function to copy strings in string_slots from tuple into write_page_. + /// Updates write_ptr_ to the end of the string data added. Returns false if the data + /// does not fit in the current write page. After returning false, write_ptr_ is left + /// pointing to the partially-written row, and no more data can be written to + /// write_page_. + bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots); + + /// Helper function to deep copy collections in collection_slots from tuple into + /// write_page_. Updates write_ptr_ to the end of the collection data added. Returns + /// false if the data does not fit in the current write page. After returning false, + /// write_ptr_ is left pointing to the partially-written row, and no more data can be + /// written to write_page_. + bool CopyCollections( + const Tuple* tuple, const std::vector<SlotDescriptor*>& collection_slots); + + /// Wrapper of the templated DeepCopyInternal() function. + bool DeepCopy(TupleRow* row) noexcept; + + /// Gets a new page of 'page_len_' bytes from buffer_pool_, updating write_page_, + /// write_ptr_ and write_end_ptr_. The caller must ensure there is sufficient unused + /// reservation to allocate the page. The caller must reset the write iterator (if + /// there is one). + Status NewWritePage() noexcept WARN_UNUSED_RESULT; + + /// Validates that a page can fit a row of 'row_size' bytes. + /// Returns an error if the row cannot fit in a page. + Status CheckPageSizeForRow(int64_t row_size); + + /// Wrapper around NewWritePage() that allocates a new write page that fits a row of + /// 'row_size' bytes. Increases reservation if needed to allocate the next page. + /// Returns OK and sets 'got_reservation' to true if the write page was successfully + /// allocated. Returns an error if the row cannot fit in a page. Returns OK and sets + /// 'got_reservation' to false if the reservation could not be increased and no other + /// error was encountered. + Status AdvanceWritePage( + int64_t row_size, bool* got_reservation) noexcept WARN_UNUSED_RESULT; + + /// Reset the write page, if there is one, and unpin pages accordingly. + void ResetWritePage(); + + /// Same as PrepareForRead(), except the iterators are not invalidated and + /// the caller is assumed to have checked there is sufficient unused reservation. + Status PrepareForReadInternal(bool delete_on_read) WARN_UNUSED_RESULT; + + /// Pins the next read page. This blocks reading from disk if necessary to bring the + /// page's data into memory. Updates read_page_, read_ptr_, and + /// read_page_rows_returned_. + Status NextReadPage() WARN_UNUSED_RESULT; + + /// Reset the read page, if there is one, and unpin pages accordingly. + void ResetReadPage(); + + /// Returns the total additional bytes that this row will consume in write_page_ if + /// appended to the page. This includes the row's null indicators, the fixed length + /// part of the row and the data for inlined_string_slots_ and inlined_coll_slots_. + int64_t ComputeRowSize(TupleRow* row) const noexcept; + + /// Pins page and updates tracking stats. + Status PinPage(Page* page) WARN_UNUSED_RESULT; + + /// Increment the page's pin count if this page needs a higher pin count given the + /// current read and write iterator positions and whether the stream will be pinned + /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to + /// be incremented multiple times. The caller is responsible for ensuring sufficient + /// reservation is available. + Status PinPageIfNeeded(Page* page, bool stream_pinned) WARN_UNUSED_RESULT; + + /// Decrement the page's pin count if this page needs a lower pin count given the + /// current read and write iterator positions and whether the stream will be pinned + /// ('stream_pinned'). Assumes that no scenarios occur when the pin count needs to + /// be decremented multiple times. + void UnpinPageIfNeeded(Page* page, bool stream_pinned); + + /// Return the expected pin count for 'page' in the current stream based on the current + /// read and write pages and whether the stream is pinned. + int ExpectedPinCount(bool stream_pinned, const Page* page) const; + + /// Templated GetNext implementations. + template <bool FILL_FLAT_ROWS> + Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); + template <bool FILL_FLAT_ROWS, bool HAS_NULLABLE_TUPLE> + Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows); + + /// Helper function to convert a flattened TupleRow stored starting at '*data' into + /// 'row'. *data is updated to point to the first byte past the end of the row. + template <bool HAS_NULLABLE_TUPLE> + void UnflattenTupleRow(uint8_t** data, TupleRow* row) const; + + /// Helper function for GetNextInternal(). For each string slot in string_slots, + /// update StringValue's ptr field to point to the corresponding string data stored + /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the + /// StringValue's length field. + void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple); + + /// Helper function for GetNextInternal(). For each collection slot in collection_slots, + /// recursively update any pointers in the CollectionValue to point to the corresponding + /// var len data stored inline in the stream, advancing read_ptr_ as data is read. + /// Assumes that the collection was serialized to the stream in DeepCopy()'s format. + void FixUpCollectionsForRead( + const vector<SlotDescriptor*>& collection_slots, Tuple* tuple); + + /// Returns the number of null indicator bytes per row. Only valid if this stream has + /// nullable tuples. + int NullIndicatorBytesPerRow() const; + + /// Returns the total bytes pinned. Only called in DCHECKs to validate bytes_pinned_. + int64_t CalcBytesPinned() const; + + /// DCHECKs if the stream is internally inconsistent. The stream should always be in + /// a consistent state after returning success from a public API call. + void CheckConsistency() const; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/buffered-tuple-stream-v2.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream-v2.inline.h b/be/src/runtime/buffered-tuple-stream-v2.inline.h new file mode 100644 index 0000000..6ad4bc4 --- /dev/null +++ b/be/src/runtime/buffered-tuple-stream-v2.inline.h @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H +#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_V2_INLINE_H + +#include "runtime/buffered-tuple-stream-v2.h" + +#include "runtime/descriptors.h" +#include "runtime/tuple-row.h" +#include "util/bit-util.h" + +namespace impala { + +inline int BufferedTupleStreamV2::NullIndicatorBytesPerRow() const { + DCHECK(has_nullable_tuple_); + return BitUtil::RoundUpNumBytes(fixed_tuple_sizes_.size()); +} + +inline bool BufferedTupleStreamV2::AddRow(TupleRow* row, Status* status) noexcept { + DCHECK(!closed_); + if (LIKELY(DeepCopy(row))) return true; + return AddRowSlow(row, status); +} + +inline uint8_t* BufferedTupleStreamV2::AllocateRow( + int fixed_size, int varlen_size, uint8_t** varlen_data, Status* status) { + DCHECK(!closed_); + DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples"; + const int total_size = fixed_size + varlen_size; + if (UNLIKELY(write_page_ == nullptr || write_ptr_ + total_size > write_end_ptr_)) { + return AllocateRowSlow(fixed_size, varlen_size, varlen_data, status); + } + DCHECK(write_page_ != nullptr); + DCHECK(write_page_->is_pinned()); + DCHECK_LE(write_ptr_ + total_size, write_end_ptr_); + ++num_rows_; + ++write_page_->num_rows; + + uint8_t* fixed_data = write_ptr_; + write_ptr_ += fixed_size; + *varlen_data = write_ptr_; + write_ptr_ += varlen_size; + return fixed_data; +} +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/bufferpool/buffer-pool-test.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 0e0d384..d4640ac 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -44,7 +44,10 @@ namespace impala { class BufferPoolTest : public ::testing::Test { public: - virtual void SetUp() { test_env_ = obj_pool_.Add(new TestEnv); } + virtual void SetUp() { + test_env_ = obj_pool_.Add(new TestEnv); + ASSERT_OK(test_env_->Init()); + } virtual void TearDown() { for (auto entry : query_reservations_) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 850c90b..611520c 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -27,6 +27,8 @@ #include "gen-cpp/CatalogService.h" #include "gen-cpp/ImpalaInternalService.h" #include "runtime/backend-client.h" +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/client-cache.h" #include "runtime/coordinator.h" #include "runtime/data-stream-mgr.h" @@ -42,6 +44,7 @@ #include "scheduling/scheduler.h" #include "service/frontend.h" #include "statestore/statestore-subscriber.h" +#include "util/bit-util.h" #include "util/debug-util.h" #include "util/debug-util.h" #include "util/default-path-handlers.h" @@ -148,6 +151,8 @@ ExecEnv::ExecEnv() "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), + buffer_reservation_(nullptr), + buffer_pool_(nullptr), enable_webserver_(FLAGS_enable_webserver), is_fe_tests_(false), backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { @@ -202,6 +207,8 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, "worker", FLAGS_coordinator_rpc_threads, numeric_limits<int32_t>::max())), async_rpc_pool_(new CallableThreadPool("rpc-pool", "async-rpc-sender", 8, 10000)), query_exec_mgr_(new QueryExecMgr()), + buffer_reservation_(nullptr), + buffer_pool_(NULL), enable_webserver_(FLAGS_enable_webserver && webserver_port > 0), is_fe_tests_(false), backend_address_(MakeNetworkAddress(FLAGS_hostname, FLAGS_be_port)) { @@ -229,7 +236,10 @@ ExecEnv::ExecEnv(const string& hostname, int backend_port, int subscriber_port, exec_env_ = this; } -ExecEnv::~ExecEnv() {} +ExecEnv::~ExecEnv() { + if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); + disk_io_mgr_.reset(); // Need to tear down before mem_tracker_. +} Status ExecEnv::InitForFeTests() { mem_tracker_.reset(new MemTracker(-1, "Process")); @@ -273,18 +283,6 @@ Status ExecEnv::StartServices() { if (bytes_limit < 0) { return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'."); } - // Minimal IO Buffer requirements: - // IO buffer (8MB default) * number of IO buffers per thread (5) * - // number of threads per core * number of cores - int64_t min_requirement = disk_io_mgr_->max_read_buffer_size() * - DiskIoMgr::DEFAULT_QUEUE_CAPACITY * - FLAGS_num_threads_per_core * FLAGS_num_cores; - if (bytes_limit < min_requirement) { - LOG(WARNING) << "Memory limit " - << PrettyPrinter::Print(bytes_limit, TUnit::BYTES) - << " does not meet minimal memory requirement of " - << PrettyPrinter::Print(min_requirement, TUnit::BYTES); - } metrics_->Init(enable_webserver_ ? webserver_.get() : NULL); impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends"); @@ -353,4 +351,10 @@ Status ExecEnv::StartServices() { return Status::OK(); } +void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) { + DCHECK(buffer_pool_ == nullptr); + buffer_pool_.reset(new BufferPool(min_page_size, capacity)); + buffer_reservation_.reset(new ReservationTracker()); + buffer_reservation_->InitRootTracker(NULL, capacity); +} } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index bdeb4a4..a5777ef 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -28,6 +28,7 @@ namespace impala { +class BufferPool; class CallableThreadPool; class DataStreamMgr; class DiskIoMgr; @@ -42,6 +43,7 @@ class PoolMemTrackerRegistry; class MetricGroup; class QueryResourceMgr; class RequestPoolService; +class ReservationTracker; class Scheduler; class StatestoreSubscriber; class TestExecEnv; @@ -65,8 +67,7 @@ class ExecEnv { /// we return the most recently created instance. static ExecEnv* GetInstance() { return exec_env_; } - /// Empty destructor because the compiler-generated one requires full - /// declarations for classes in scoped_ptrs. + /// Destructor - only used in backend tests that create new environment per test. virtual ~ExecEnv(); void SetImpalaServer(ImpalaServer* server) { impala_server_ = server; } @@ -99,6 +100,8 @@ class ExecEnv { CallableThreadPool* rpc_pool() { return async_rpc_pool_.get(); } QueryExecMgr* query_exec_mgr() { return query_exec_mgr_.get(); } PoolMemTrackerRegistry* pool_mem_trackers() { return pool_mem_trackers_.get(); } + ReservationTracker* buffer_reservation() { return buffer_reservation_.get(); } + BufferPool* buffer_pool() { return buffer_pool_.get(); } void set_enable_webserver(bool enable) { enable_webserver_ = enable; } @@ -143,12 +146,20 @@ class ExecEnv { boost::scoped_ptr<CallableThreadPool> async_rpc_pool_; boost::scoped_ptr<QueryExecMgr> query_exec_mgr_; + /// Query-wide buffer pool and the root reservation tracker for the pool. The + /// reservation limit is equal to the maximum capacity of the pool. + /// For now this is only used by backend tests that create them via InitBufferPool(); + boost::scoped_ptr<ReservationTracker> buffer_reservation_; + boost::scoped_ptr<BufferPool> buffer_pool_; + /// Not owned by this class ImpalaServer* impala_server_; bool enable_webserver_; private: + friend class TestEnv; + static ExecEnv* exec_env_; bool is_fe_tests_; @@ -157,6 +168,9 @@ class ExecEnv { /// fs.defaultFs value set in core-site.xml std::string default_fs_; + + /// Initialise 'buffer_pool_' and 'buffer_reservation_' with given capacity. + void InitBufferPool(int64_t min_page_len, int64_t capacity); }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 2b2e3a0..b931808 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -15,16 +15,18 @@ // specific language governing permissions and limitations // under the License. - #include "runtime/query-state.h" -#include <boost/thread/locks.hpp> #include <boost/thread/lock_guard.hpp> +#include <boost/thread/locks.hpp> +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" #include "runtime/mem-tracker.h" #include "runtime/query-exec-mgr.h" +#include "util/debug-util.h" #include "common/names.h" @@ -41,7 +43,12 @@ QueryState::ScopedRef::~ScopedRef() { } QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool) - : query_ctx_(query_ctx), refcnt_(0), prepared_(false), released_resources_(false) { + : query_ctx_(query_ctx), + refcnt_(0), + prepared_(false), + released_resources_(false), + buffer_reservation_(nullptr), + file_group_(nullptr) { TQueryOptions& query_options = query_ctx_.client_request.query_options; // max_errors does not indicate how many errors in total have been recorded, but rather // how many are distinct. It is defined as the sum of the number of generic errors and @@ -56,8 +63,12 @@ QueryState::QueryState(const TQueryCtx& query_ctx, const std::string& pool) } void QueryState::ReleaseResources() { + // Clean up temporary files. + if (file_group_ != nullptr) file_group_->Close(); + // Release any remaining reservation. + if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); // Avoid dangling reference from the parent of 'query_mem_tracker_'. - query_mem_tracker_->UnregisterFromParent(); + if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent(); released_resources_ = true; } @@ -77,19 +88,27 @@ Status QueryState::Prepare() { // Starting a new query creates threads and consumes a non-trivial amount of memory. // If we are already starved for memory, fail as early as possible to avoid consuming // more resources. - MemTracker* process_mem_tracker = ExecEnv::GetInstance()->process_mem_tracker(); + ExecEnv* exec_env = ExecEnv::GetInstance(); + MemTracker* process_mem_tracker = exec_env->process_mem_tracker(); if (process_mem_tracker->LimitExceeded()) { string msg = Substitute("Query $0 could not start because the backend Impala daemon " "is over its memory limit", PrintId(query_id())); - prepare_status_ = process_mem_tracker->MemLimitExceeded(NULL, msg, 0); - return prepare_status_; + status = process_mem_tracker->MemLimitExceeded(NULL, msg, 0); + goto error; + } + // Do buffer-pool-related setup if running in a backend test that explicitly created + // the pool. + if (exec_env->buffer_pool() != nullptr) { + status = InitBufferPoolState(); + if (!status.ok()) goto error; } - - // TODO: IMPALA-3748: acquire minimum buffer reservation at this point. - prepared_ = true; return Status::OK(); + +error: + prepare_status_ = status; + return status; } void QueryState::InitMemTrackers(const std::string& pool) { @@ -103,6 +122,37 @@ void QueryState::InitMemTrackers(const std::string& pool) { MemTracker::CreateQueryMemTracker(query_id(), query_options(), pool, &obj_pool_); } +Status QueryState::InitBufferPoolState() { + ExecEnv* exec_env = ExecEnv::GetInstance(); + int64_t query_mem_limit = query_mem_tracker_->limit(); + if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max(); + + // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from + // query_mem_limit. + int64_t max_reservation = numeric_limits<int64_t>::max(); + if (query_options().__isset.max_block_mgr_memory + && query_options().max_block_mgr_memory > 0) { + max_reservation = query_options().max_block_mgr_memory; + } + + // TODO: IMPALA-3748: claim the query-wide minimum reservation. + // For now, rely on exec nodes to grab their minimum reservation during Prepare(). + buffer_reservation_ = obj_pool_.Add(new ReservationTracker); + buffer_reservation_->InitChildTracker( + NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation); + + // TODO: once there's a mechanism for reporting non-fragment-local profiles, + // should make sure to report this profile so it's not going into a black hole. + RuntimeProfile* dummy_profile = obj_pool_.Add(new RuntimeProfile(&obj_pool_, "dummy")); + // Only create file group if spilling is enabled. + if (query_options().scratch_limit != 0 && !query_ctx_.disable_spilling) { + file_group_ = obj_pool_.Add( + new TmpFileMgr::FileGroup(exec_env->tmp_file_mgr(), exec_env->disk_io_mgr(), + dummy_profile, query_id(), query_options().scratch_limit)); + } + return Status::OK(); +} + void QueryState::RegisterFInstance(FragmentInstanceState* fis) { VLOG_QUERY << "RegisterFInstance(): instance_id=" << PrintId(fis->instance_id()); lock_guard<SpinLock> l(fis_map_lock_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index c381dfe..650e8bf 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -26,6 +26,7 @@ #include "common/object-pool.h" #include "gen-cpp/ImpalaInternalService_types.h" #include "gen-cpp/Types_types.h" +#include "runtime/tmp-file-mgr.h" #include "util/spinlock.h" #include "util/uid-util.h" @@ -33,6 +34,7 @@ namespace impala { class FragmentInstanceState; class MemTracker; +class ReservationTracker; /// Central class for all backend execution state (example: the FragmentInstanceStates /// of the individual fragment instances) created for a particular query. @@ -94,6 +96,8 @@ class QueryState { } MemTracker* query_mem_tracker() const { return query_mem_tracker_; } + ReservationTracker* buffer_reservation() const { return buffer_reservation_; } + TmpFileMgr::FileGroup* file_group() const { return file_group_; } /// Sets up state required for fragment execution: memory reservations, etc. Fails /// if resources could not be acquired. Safe to call concurrently and idempotent: @@ -145,12 +149,26 @@ class QueryState { /// The top-level MemTracker for this query (owned by obj_pool_). MemTracker* query_mem_tracker_; + /// Buffer reservation for this query (owned by obj_pool_) + /// Only non-null in backend tests that explicitly enabled the new buffer pool + /// TODO: this will always be non-null once IMPALA-3200 is done + ReservationTracker* buffer_reservation_; + + /// Temporary files for this query (owned by obj_pool_) + /// Only non-null in backend tests the explicitly enabled the new buffer pool + /// TODO: this will always be non-null once IMPALA-3200 is done + TmpFileMgr::FileGroup* file_group_; + /// Create QueryState w/ copy of query_ctx and refcnt of 0. /// The query is associated with the resource pool named 'pool' QueryState(const TQueryCtx& query_ctx, const std::string& pool); /// Called from Prepare() to initialize MemTrackers. void InitMemTrackers(const std::string& pool); + + /// Called from PrepareForExecution() to setup buffer reservations and the + /// file group. Fails if required resources are not available. + Status InitBufferPoolState(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 6daa02f..8fb0b55 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -17,17 +17,18 @@ #include "runtime/row-batch.h" -#include <stdint.h> // for intptr_t +#include <stdint.h> // for intptr_t #include <boost/scoped_ptr.hpp> +#include "gen-cpp/Results_types.h" +#include "runtime/exec-env.h" #include "runtime/mem-tracker.h" #include "runtime/string-value.h" #include "runtime/tuple-row.h" #include "util/compress.h" -#include "util/decompress.h" #include "util/debug-util.h" +#include "util/decompress.h" #include "util/fixed-size-hash-table.h" -#include "gen-cpp/Results_types.h" #include "common/names.h" @@ -157,6 +158,10 @@ RowBatch::~RowBatch() { for (int i = 0; i < blocks_.size(); ++i) { blocks_[i]->Delete(); } + for (BufferInfo& buffer_info : buffers_) { + ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( + buffer_info.client, &buffer_info.buffer); + } if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) { DCHECK(tuple_ptrs_ != NULL); free(tuple_ptrs_); @@ -305,6 +310,16 @@ void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) { if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources(); } +void RowBatch::AddBuffer( + BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush) { + auxiliary_mem_usage_ += buffer.len(); + BufferInfo buffer_info; + buffer_info.client = client; + buffer_info.buffer = std::move(buffer); + buffers_.push_back(std::move(buffer_info)); + if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources(); +} + void RowBatch::Reset() { num_rows_ = 0; capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*)); @@ -318,6 +333,11 @@ void RowBatch::Reset() { blocks_[i]->Delete(); } blocks_.clear(); + for (BufferInfo& buffer_info : buffers_) { + ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( + buffer_info.client, &buffer_info.buffer); + } + buffers_.clear(); auxiliary_mem_usage_ = 0; if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) { tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_)); @@ -336,6 +356,11 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) { dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES); } blocks_.clear(); + for (BufferInfo& buffer_info : buffers_) { + dest->AddBuffer( + buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES); + } + buffers_.clear(); if (needs_deep_copy_) { dest->MarkNeedsDeepCopy(); } else if (flush_ == FlushMode::FLUSH_RESOURCES) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/66328524/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 91433c4..0bb71d8 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -15,18 +15,18 @@ // specific language governing permissions and limitations // under the License. - #ifndef IMPALA_RUNTIME_ROW_BATCH_H #define IMPALA_RUNTIME_ROW_BATCH_H -#include <vector> #include <cstring> +#include <vector> #include <boost/scoped_ptr.hpp> #include "codegen/impala-ir.h" #include "common/compiler-util.h" #include "common/logging.h" -#include "runtime/buffered-block-mgr.h" // for BufferedBlockMgr::Block +#include "runtime/buffered-block-mgr.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/descriptors.h" #include "runtime/disk-io-mgr.h" #include "runtime/mem-pool.h" @@ -208,6 +208,7 @@ class RowBatch { MemPool* tuple_data_pool() { return &tuple_data_pool_; } int num_io_buffers() const { return io_buffers_.size(); } int num_blocks() const { return blocks_.size(); } + int num_buffers() const { return buffers_.size(); } /// Resets the row batch, returning all resources it has accumulated. void Reset(); @@ -220,10 +221,18 @@ class RowBatch { /// the original owner, even when the ownership of batches is transferred. If the /// original owner wants the memory to be released, it should call this with 'mode' /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation). - /// TODO: after IMPALA-3200, make the ownership transfer model consistent between - /// Blocks and I/O buffers. void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush); + /// Adds a buffer to this row batch. The buffer is deleted when freeing resources. + /// The buffer's memory remains accounted against the original owner, even when the + /// ownership of batches is transferred. If the original owner wants the memory to be + /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources() + /// for further explanation). + /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and + /// make it consistent between buffers and I/O buffers. + void AddBuffer( + BufferPool::ClientHandle* client, BufferPool::BufferHandle buffer, FlushMode flush); + /// Used by an operator to indicate that it cannot produce more rows until the /// resources that it has attached to the row batch are freed or acquired by an /// ancestor operator. After this is called, the batch is at capacity and no more rows @@ -424,11 +433,19 @@ class RowBatch { /// are owned by the BufferedBlockMgr. std::vector<BufferedBlockMgr::Block*> blocks_; + struct BufferInfo { + BufferPool::ClientHandle* client; + BufferPool::BufferHandle buffer; + }; + + /// Pages attached to this row batch. See AddBuffer() for ownership semantics. + std::vector<BufferInfo> buffers_; + /// String to write compressed tuple data to in Serialize(). /// This is a string so we can swap() with the string in the TRowBatch we're serializing /// to (we don't compress directly into the TRowBatch in case the compressed data is - /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and - /// avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and + /// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch + /// and avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and /// assuming all row batches are roughly the same size, all strings will eventually be /// allocated to the right size. std::string compression_scratch_;
