http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h deleted file mode 100644 index 41d63bf..0000000 --- a/be/src/runtime/buffered-tuple-stream.h +++ /dev/null @@ -1,561 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H -#define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H - -#include <vector> -#include <set> - -#include "common/status.h" -#include "runtime/buffered-block-mgr.h" -#include "runtime/row-batch.h" - -namespace impala { - -class BufferedBlockMgr; -class RuntimeProfile; -class RuntimeState; -class RowDescriptor; -class SlotDescriptor; -class TupleRow; - -/// Class that provides an abstraction for a stream of tuple rows. Rows can be -/// added to the stream and returned. Rows are returned in the order they are added. -/// -/// The underlying memory management is done by the BufferedBlockMgr. -/// -/// The tuple stream consists of a number of small (less than IO-sized blocks) before -/// an arbitrary number of IO-sized blocks. The smaller blocks do not spill and are -/// there to lower the minimum buffering requirements. For example, an operator that -/// needs to maintain 64 streams (1 buffer per partition) would need, by default, -/// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require -/// 2.56GB just to run, regardless of how much of that is used. This is -/// problematic for small queries. Instead we will start with a fixed number of small -/// buffers (currently 2 small buffers: one 64KB and one 512KB) and only start using IO -/// sized buffers when those fill up. The small buffers never spill. -/// The stream will *not* automatically switch from using small buffers to IO-sized -/// buffers when all the small buffers for this stream have been used. -/// -/// The BufferedTupleStream is *not* thread safe from the caller's point of view. It is -/// expected that all the APIs are called from a single thread. Internally, the -/// object is thread safe wrt to the underlying block mgr. -/// -/// Buffer management: -/// The stream is either pinned or unpinned, set via PinStream() and UnpinStream(). -/// Blocks are optionally deleted as they are read, set with the delete_on_read argument -/// to PrepareForRead(). -/// -/// Block layout: -/// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a -/// bitstring at the start of each block with null indicators for all tuples in each row -/// in the block. The length of the bitstring is a function of the block size. Row data -/// is stored after the null indicators if present, or at the start of the block -/// otherwise. Rows are stored back to back in the stream, with no interleaving of data -/// from different rows. There is no padding or alignment between rows. -/// -/// Null tuples: -/// The order of bits in the null indicators bitstring corresponds to the order of -/// tuples in the block. The NULL tuples are not stored in the row iself, only as set -/// bits in the null indicators bitstring. -/// -/// Tuple row layout: -/// The fixed length parts of the row's tuples are stored first, followed by var len data -/// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can -/// point to var len data outside the stream. When reading the stream, the length of each -/// row's var len data in the stream must be computed to find the next row's start. -/// -/// The tuple stream supports reading from the stream into RowBatches without copying -/// out any data: the RowBatches' Tuple pointers will point directly into the stream's -/// blocks. The fixed length parts follow Impala's internal tuple format, so for the -/// tuple to be valid, we only need to update pointers to point to the var len data -/// in the stream. These pointers need to be updated by the stream because a spilled -/// block may be relocated to a different location in memory. The pointers are updated -/// lazily upon reading the stream via GetNext() or GetRows(). -/// -/// Example layout for a row with two tuples ((1, "hello"), (2, "world")) with all var -/// len data stored in the stream: -/// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ... -/// +--------+-----------+-----------+-----------+-------------+ -/// | IntVal | StringVal | BigIntVal | StringVal | | ... -/// +--------+-----------+-----------+-----------++------------+ -/// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ... -/// | | ptr: 0x.. | | ptr: 0x.. | | ... -/// +--------+-----------+-----------+-----------+-------------+ -/// <--4b--> <---12b---> <----8b---> <---12b---> <----10b----> -// -/// Example layout for a row with a single tuple (("hello", "world")) with the second -/// string slot stored externally to the stream: -/// <------ tuple 1 ------> <- var len -> <- next row ... -/// +-----------+-----------+-------------+ -/// | StringVal | StringVal | | ... -/// +-----------+-----------+-------------+ -/// | len: 5 | len: 5 | hello | ... -/// | ptr: 0x.. | ptr: 0x.. | | ... -/// +-----------+-----------+-------------+ -/// <---12b---> <---12b---> <-----5b----> -/// -/// The behavior of reads and writes is as follows: -/// Read: -/// 1. Delete on read (delete_on_read_): Blocks are deleted as we go through the stream. -/// The data returned by the tuple stream is valid until the next read call so the -/// caller does not need to copy if it is streaming. -/// 2. Unpinned: Blocks remain in blocks_ and are unpinned after reading. -/// 3. Pinned: Blocks remain in blocks_ and are left pinned after reading. If the next -/// block in the stream cannot be pinned, the read call will fail and the caller needs -/// to free memory from the underlying block mgr. -/// Write: -/// 1. Unpinned: Unpin blocks as they fill up. This means only a single (i.e. the -/// current) block needs to be in memory regardless of the input size (if read_write is -/// true, then two blocks need to be in memory). -/// 2. Pinned: Blocks are left pinned. If we run out of blocks, the write will fail and -/// the caller needs to free memory from the underlying block mgr. -/// -/// Memory lifetime of rows read from stream: -/// If the stream is pinned, it is valid to access any tuples returned via -/// GetNext() or GetRows() until the stream is unpinned. If the stream is unpinned, and -/// the batch returned from GetNext() has the needs_deep_copy flag set, any tuple memory -/// returned so far from the stream may be freed on the next call to GetNext(). -/// -/// Manual construction of rows with AllocateRow(): -/// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow(). -/// The caller of AllocateRow() is responsible for writing the row with exactly the -/// layout described above. -/// -/// If a caller constructs a tuple in this way, the caller can set the pointers and they -/// will not be modified until the stream is read via GetNext() or GetRows(). -/// -/// TODO: we need to be able to do read ahead in the BufferedBlockMgr. It currently -/// only has PinAllBlocks() which is blocking. We need a non-blocking version of this or -/// some way to indicate a block will need to be pinned soon. -/// TODO: see if this can be merged with Sorter::Run. The key difference is that this -/// does not need to return rows in the order they were added, which allows it to be -/// simpler. -/// TODO: we could compact the small buffers when we need to spill but they use very -/// little memory so ths might not be very useful. -/// TODO: improvements: -/// - It would be good to allocate the null indicators at the end of each block and grow -/// this array as new rows are inserted in the block. If we do so, then there will be -/// fewer gaps in case of many rows with NULL tuples. -/// - We will want to multithread this. Add a AddBlock() call so the synchronization -/// happens at the block level. This is a natural extension. -/// - Instead of allocating all blocks from the block_mgr, allocate some blocks that -/// are much smaller (e.g. 16K and doubling up to the block size). This way, very -/// small streams (a common case) will use very little memory. This small blocks -/// are always in memory since spilling them frees up negligible memory. -/// - Return row batches in GetNext() instead of filling one in -class BufferedTupleStream { - public: - /// Ordinal index into the stream to retrieve a row in O(1) time. This index can - /// only be used if the stream is pinned. - /// To read a row from a stream we need three pieces of information that we squeeze in - /// 64 bits: - /// - The index of the block. The block id is stored in 16 bits. We can have up to - /// 64K blocks per tuple stream. With 8MB blocks that is 512GB per stream. - /// - The offset of the start of the row (data) within the block. Since blocks are 8MB - /// we use 24 bits for the offsets. (In theory we could use 23 bits.) - /// - The idx of the row in the block. We need this for retrieving the null indicators. - /// We use 24 bits for this index as well. - struct RowIdx { - static const uint64_t BLOCK_MASK = 0xFFFF; - static const uint64_t BLOCK_SHIFT = 0; - static const uint64_t OFFSET_MASK = 0xFFFFFF0000; - static const uint64_t OFFSET_SHIFT = 16; - static const uint64_t IDX_MASK = 0xFFFFFF0000000000; - static const uint64_t IDX_SHIFT = 40; - - uint64_t block() const { - return (data & BLOCK_MASK); - } - - uint64_t offset() const { - return (data & OFFSET_MASK) >> OFFSET_SHIFT; - } - - uint64_t idx() const { - return (data & IDX_MASK) >> IDX_SHIFT; - } - - uint64_t set(uint64_t block, uint64_t offset, uint64_t idx) { - DCHECK_LE(block, BLOCK_MASK) - << "Cannot have more than 2^16 = 64K blocks in a tuple stream."; - DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT) - << "Cannot have blocks larger than 2^24 = 16MB"; - DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT) - << "Cannot have more than 2^24 = 16M rows in a block."; - data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT); - return data; - } - - std::string DebugString() const; - - uint64_t data; - }; - - /// row_desc: description of rows stored in the stream. This is the desc for rows - /// that are added and the rows being returned. - /// block_mgr: Underlying block mgr that owns the data blocks. - /// use_initial_small_buffers: If true, the initial N buffers allocated for the - /// tuple stream use smaller than IO-sized buffers. - /// read_write: Stream allows interchanging read and write operations. Requires at - /// least two blocks may be pinned. - /// ext_varlen_slots: set of varlen slots with data stored externally to the stream - BufferedTupleStream(RuntimeState* state, const RowDescriptor* row_desc, - BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, - bool use_initial_small_buffers, bool read_write, - const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); - - ~BufferedTupleStream(); - - /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called - /// once before any of the other APIs. - /// If 'pinned' is true, the tuple stream starts of pinned, otherwise it is unpinned. - /// If 'profile' is non-NULL, counters are created. - /// 'node_id' is only used for error reporting. - Status Init(int node_id, RuntimeProfile* profile, bool pinned); - - /// Prepares the stream for writing by attempting to allocate a write block. - /// Called after Init() and before the first AddRow() call. - /// 'got_buffer': set to true if the first write block was successfully pinned, or - /// false if the block could not be pinned and no error was encountered. Undefined - /// if an error status is returned. - Status PrepareForWrite(bool* got_buffer); - - /// Must be called for streams using small buffers to switch to IO-sized buffers. - /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_ - /// back to false. - /// TODO: IMPALA-3200: remove this when small buffers are removed. - Status SwitchToIoBuffers(bool* got_buffer); - - /// Adds a single row to the stream. Returns true if the append succeeded, returns false - /// and sets 'status' to OK if appending failed but can be retried or returns false and - /// sets 'status' to an error if an error occurred. - /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() - /// returns an error, it should not be called again. If appending failed without an - /// error and the stream is using small buffers, it is valid to call - /// SwitchToIoBuffers() then AddRow() again. - bool AddRow(TupleRow* row, Status* status) noexcept; - - /// Allocates space to store a row of with fixed length 'fixed_size' and variable - /// length data 'varlen_size'. If successful, returns the pointer where fixed length - /// data should be stored and assigns 'varlen_data' to where var-len data should - /// be stored. Returns NULL if there is not enough memory or an error occurred. - /// Sets *status if an error occurred. The returned memory is guaranteed to all - /// be allocated in the same block. AllocateRow does not currently support nullable - /// tuples. - uint8_t* AllocateRow(int fixed_size, int varlen_size, uint8_t** varlen_data, - Status* status); - - /// Populates 'row' with the row at 'idx'. The stream must be pinned. The row must have - /// been allocated with the stream's row desc. - void GetTupleRow(const RowIdx& idx, TupleRow* row) const; - - /// Prepares the stream for reading. If read_write_, this can be called at any time to - /// begin reading. Otherwise this must be called after the last AddRow() and - /// before GetNext(). - /// delete_on_read: Blocks are deleted after they are read. - /// got_buffer: set to true if the first read block was successfully pinned, or - /// false if the block could not be pinned and no error was encountered. - Status PrepareForRead(bool delete_on_read, bool* got_buffer); - - /// Pins all blocks in this stream and switches to pinned mode. - /// If there is not enough memory, *pinned is set to false and the stream is unmodified. - /// If already_reserved is true, the caller has already made a reservation on - /// block_mgr_client_ to pin the stream. - Status PinStream(bool already_reserved, bool* pinned); - - /// Modes for UnpinStream(). - enum UnpinMode { - /// All blocks in the stream are unpinned and the read/write positions in the stream - /// are reset. No more rows can be written to the stream after this. The stream can - /// be re-read from the beginning by calling PrepareForRead(). - UNPIN_ALL, - /// All blocks are unpinned aside from the current read and write blocks (if any), - /// which is left in the same state. The unpinned stream can continue being read - /// or written from the current read or write positions. - UNPIN_ALL_EXCEPT_CURRENT, - }; - - /// Unpins stream with the given 'mode' as described above. - Status UnpinStream(UnpinMode mode); - - /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream - /// and must be copied out by the caller. - Status GetNext(RowBatch* batch, bool* eos); - - /// Same as above, but also populate 'indices' with the index of each returned row. - Status GetNext(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); - - /// Returns all the rows in the stream in batch. This pins the entire stream in the - /// process. - /// *got_rows is false if the stream could not be pinned. - Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows); - - /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, - /// attaches any pinned blocks to the batch and deletes unpinned blocks. Otherwise - /// deletes all blocks. Does nothing if the stream was already closed. The 'flush' - /// mode is forwarded to RowBatch::AddBlock() when attaching blocks. - void Close(RowBatch* batch, RowBatch::FlushMode flush); - - /// Number of rows in the stream. - int64_t num_rows() const { return num_rows_; } - - /// Number of rows returned via GetNext(). - int64_t rows_returned() const { return rows_returned_; } - - /// Returns the byte size necessary to store the entire stream in memory. - int64_t byte_size() const { return total_byte_size_; } - - /// Returns the byte size of the stream that is currently pinned in memory. - /// If ignore_current is true, the write_block_ memory is not included. - int64_t bytes_in_mem(bool ignore_current) const; - - bool is_closed() const { return closed_; } - bool is_pinned() const { return pinned_; } - int blocks_pinned() const { return num_pinned_; } - int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; } - bool has_read_block() const { return read_block_ != blocks_.end(); } - bool has_write_block() const { return write_block_ != NULL; } - bool using_small_buffers() const { return use_small_buffers_; } - - /// Returns true if the row consumes any memory. If false, the stream only needs to - /// store the count of rows. - bool RowConsumesMemory() const { - return fixed_tuple_row_size_ > 0 || has_nullable_tuple_; - } - - std::string DebugString() const; - - private: - friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; - friend class ArrayTupleStreamTest_TestComputeRowSize_Test; - friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; - friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; - - /// Runtime state instance used to check for cancellation. Not owned. - RuntimeState* const state_; - - /// Description of rows stored in the stream. - const RowDescriptor* desc_; - - /// Sum of the fixed length portion of all the tuples in desc_. - int fixed_tuple_row_size_; - - /// The size of the fixed length portion for each tuple in the row. - std::vector<int> fixed_tuple_sizes_; - - /// Max size (in bytes) of null indicators bitmap in the current read and write - /// blocks. If 0, it means that there is no need to store null indicators for this - /// RowDesc. We calculate this value based on the block's size and the - /// fixed_tuple_row_size_. When not 0, this value is also an upper bound for the number - /// of (rows * tuples_per_row) in this block. - int read_block_null_indicators_size_; - int write_block_null_indicators_size_; - - /// Size (in bytes) of the null indicators bitmap reserved in a block of maximum - /// size (i.e. IO block size). 0 if no tuple is nullable. - int max_null_indicators_size_; - - /// Vectors of all the strings slots that have their varlen data stored in stream - /// grouped by tuple_idx. - std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_; - - /// Vectors of all the collection slots that have their varlen data stored in the - /// stream, grouped by tuple_idx. - std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_; - - /// Block manager and client used to allocate, pin and release blocks. Not owned. - BufferedBlockMgr* block_mgr_; - BufferedBlockMgr::Client* block_mgr_client_; - - /// List of blocks in the stream. - std::list<BufferedBlockMgr::Block*> blocks_; - - /// Total size of blocks_, including small blocks. - int64_t total_byte_size_; - - /// Iterator pointing to the current block for read. Equal to list.end() until - /// PrepareForRead() is called. - std::list<BufferedBlockMgr::Block*>::iterator read_block_; - - /// For each block in the stream, the buffer of the start of the block. This is only - /// valid when the stream is pinned, giving random access to data in the stream. - /// This is not maintained for delete_on_read_. - std::vector<uint8_t*> block_start_idx_; - - /// Current idx of the tuple read from the read_block_ buffer. - uint32_t read_tuple_idx_; - - /// Current offset in read_block_ of the end of the last data read. - uint8_t* read_ptr_; - - /// Pointer to one byte past the end of read_block_. - uint8_t* read_end_ptr_; - - /// Current idx of the tuple written at the write_block_ buffer. - uint32_t write_tuple_idx_; - - /// Pointer into write_block_ of the end of the last data written. - uint8_t* write_ptr_; - - /// Pointer to one byte past the end of write_block_. - uint8_t* write_end_ptr_; - - /// Number of rows returned to the caller from GetNext(). - int64_t rows_returned_; - - /// The block index of the current read block in blocks_. - int read_block_idx_; - - /// The current block for writing. NULL if there is no available block to write to. - /// The entire write_block_ buffer is marked as allocated, so any data written into - /// the buffer will be spilled without having to allocate additional space. - BufferedBlockMgr::Block* write_block_; - - /// Number of pinned blocks in blocks_, stored to avoid iterating over the list - /// to compute bytes_in_mem and bytes_unpinned. - /// This does not include small blocks. - int num_pinned_; - - /// The total number of small blocks in blocks_; - int num_small_blocks_; - - /// Number of rows stored in the stream. - int64_t num_rows_; - - /// Counters added by this object to the parent runtime profile. - RuntimeProfile::Counter* pin_timer_; - RuntimeProfile::Counter* unpin_timer_; - RuntimeProfile::Counter* get_new_block_timer_; - - /// If true, read and write operations may be interleaved. Otherwise all calls - /// to AddRow() must occur before calling PrepareForRead() and subsequent calls to - /// GetNext(). - const bool read_write_; - - /// Whether any tuple in the rows is nullable. - const bool has_nullable_tuple_; - - /// If true, this stream is still using small buffers. - bool use_small_buffers_; - - /// If true, blocks are deleted after they are read. - bool delete_on_read_; - - bool closed_; // Used for debugging. - - /// If true, this stream has been explicitly pinned by the caller. This changes the - /// memory management of the stream. The blocks are not unpinned until the caller calls - /// UnpinAllBlocks(). If false, only the write_block_ and/or read_block_ are pinned - /// (both are if read_write_ is true). - bool pinned_; - - /// The slow path for AddRow() that is called if there is not sufficient space in - /// the current block. - bool AddRowSlow(TupleRow* row, Status* status) noexcept; - - /// Copies 'row' into write_block_. Returns false if there is not enough space in - /// 'write_block_'. After returning false, write_ptr_ may be left pointing to the - /// partially-written row, and no more data can be written to write_block_. - template <bool HAS_NULLABLE_TUPLE> - bool DeepCopyInternal(TupleRow* row) noexcept; - - /// Helper function to copy strings in string_slots from tuple into write_block_. - /// Updates write_ptr_ to the end of the string data added. Returns false if the data - /// does not fit in the current write block. After returning false, write_ptr_ is left - /// pointing to the partially-written row, and no more data can be written to - /// write_block_. - bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots); - - /// Helper function to deep copy collections in collection_slots from tuple into - /// write_block_. Updates write_ptr_ to the end of the collection data added. Returns - /// false if the data does not fit in the current write block.. After returning false, - /// write_ptr_ is left pointing to the partially-written row, and no more data can be - /// written to write_block_. - bool CopyCollections(const Tuple* tuple, - const std::vector<SlotDescriptor*>& collection_slots); - - /// Wrapper of the templated DeepCopyInternal() function. - bool DeepCopy(TupleRow* row) noexcept; - - /// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_, - /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the - /// number of bytes that will be reserved in the block for the null indicators bitmap. - /// *got_block is set to true if a block was successfully acquired. Null indicators - /// (if any) will also be reserved and initialized. If there are no blocks available, - /// *got_block is set to false and write_block_ is unchanged. - Status NewWriteBlock( - int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept; - - /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be - /// appended to this block. This function determines the block size required in order - /// to fit the row and null indicators. - Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept; - - /// Reads the next block from the block_mgr_. This blocks if necessary. - /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_. - Status NextReadBlock(); - - /// Returns the total additional bytes that this row will consume in write_block_ if - /// appended to the block. This includes the fixed length part of the row and the - /// data for inlined_string_slots_ and inlined_coll_slots_. - int64_t ComputeRowSize(TupleRow* row) const noexcept; - - /// Unpins block if it is an IO-sized block and updates tracking stats. - Status UnpinBlock(BufferedBlockMgr::Block* block); - - /// Templated GetNext implementations. - template <bool FILL_INDICES> - Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); - template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE> - Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); - - /// Helper function for GetNextInternal(). For each string slot in string_slots, - /// update StringValue's ptr field to point to the corresponding string data stored - /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the - /// StringValue's length field. - void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple); - - /// Helper function for GetNextInternal(). For each collection slot in collection_slots, - /// recursively update any pointers in the CollectionValue to point to the corresponding - /// var len data stored inline in the stream, advancing read_ptr_ as data is read. - /// Assumes that the collection was serialized to the stream in DeepCopy()'s format. - void FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots, - Tuple* tuple); - - /// Computes the number of bytes needed for null indicators for a block of 'block_size'. - /// Return 0 if no tuple is nullable. Return -1 if a single row of fixed-size tuples - /// plus its null indicator (if any) cannot fit in the block. - int ComputeNumNullIndicatorBytes(int block_size) const; - - uint32_t read_block_bytes_remaining() const { - DCHECK_GE(read_end_ptr_, read_ptr_); - DCHECK_LE(read_end_ptr_ - read_ptr_, (*read_block_)->buffer_len()); - return read_end_ptr_ - read_ptr_; - } - - uint32_t write_block_bytes_remaining() const { - DCHECK_GE(write_end_ptr_, write_ptr_); - DCHECK_LE(write_end_ptr_ - write_ptr_, write_block_->buffer_len()); - return write_end_ptr_ - write_ptr_; - } - -}; - -} - -#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/buffered-tuple-stream.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h deleted file mode 100644 index ba6bb8c..0000000 --- a/be/src/runtime/buffered-tuple-stream.inline.h +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H -#define IMPALA_RUNTIME_TUPLE_BUFFERED_STREAM_INLINE_H - -#include "runtime/buffered-tuple-stream.h" - -#include "runtime/descriptors.h" -#include "runtime/tuple-row.h" - -namespace impala { - -inline bool BufferedTupleStream::AddRow(TupleRow* row, Status* status) noexcept { - DCHECK(!closed_); - if (LIKELY(DeepCopy(row))) return true; - return AddRowSlow(row, status); -} - -inline uint8_t* BufferedTupleStream::AllocateRow(int fixed_size, int varlen_size, - uint8_t** varlen_data, Status* status) { - DCHECK(!closed_); - DCHECK(!has_nullable_tuple_) << "AllocateRow does not support nullable tuples"; - const int total_size = fixed_size + varlen_size; - if (UNLIKELY(write_block_ == NULL || write_block_bytes_remaining() < total_size)) { - bool got_block; - *status = NewWriteBlockForRow(total_size, &got_block); - if (!status->ok() || !got_block) return NULL; - } - DCHECK(write_block_ != NULL); - DCHECK(write_block_->is_pinned()); - DCHECK_GE(write_block_bytes_remaining(), total_size); - ++num_rows_; - write_block_->AddRow(); - - uint8_t* fixed_data = write_ptr_; - write_ptr_ += fixed_size; - *varlen_data = write_ptr_; - write_ptr_ += varlen_size; - return fixed_data; -} - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index 9b16112..83f2e6a 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -308,6 +308,16 @@ int64_t BufferPool::ClientHandle::GetUnusedReservation() const { return impl_->reservation()->GetUnusedReservation(); } +bool BufferPool::ClientHandle::TransferReservationFrom( + ReservationTracker* src, int64_t bytes) { + return src->TransferReservationTo(impl_->reservation(), bytes); +} + +bool BufferPool::ClientHandle::TransferReservationTo( + ReservationTracker* dst, int64_t bytes) { + return impl_->reservation()->TransferReservationTo(dst, bytes); +} + void BufferPool::ClientHandle::SaveReservation(SubReservation* dst, int64_t bytes) { DCHECK_EQ(dst->tracker_->parent(), impl_->reservation()); bool success = impl_->reservation()->TransferReservationTo(dst->tracker_.get(), bytes); @@ -355,7 +365,7 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true); reservation_.InitChildTracker( child_profile, parent_reservation, mem_tracker, reservation_limit); - counters_.alloc_time = ADD_TIMER(profile, "AllocTime"); + counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime"); counters_.cumulative_allocations = ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT); counters_.cumulative_bytes_alloced = http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/buffer-pool.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index f2ff99b..e3df8df 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -338,6 +338,14 @@ class BufferPool::ClientHandle { int64_t GetUsedReservation() const; int64_t GetUnusedReservation() const; + /// Try to transfer 'bytes' of reservation from 'src' to this client using + /// ReservationTracker::TransferReservationTo(). + bool TransferReservationFrom(ReservationTracker* src, int64_t bytes); + + /// Transfer 'bytes' of reservation from this client to 'dst' using + /// ReservationTracker::TransferReservationTo(). + bool TransferReservationTo(ReservationTracker* dst, int64_t bytes); + bool is_registered() const { return impl_ != NULL; } std::string DebugString() const; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/bufferpool/reservation-tracker.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h index 4d525c0..80084bc 100644 --- a/be/src/runtime/bufferpool/reservation-tracker.h +++ b/be/src/runtime/bufferpool/reservation-tracker.h @@ -127,6 +127,10 @@ class ReservationTracker { /// Returns true if the reservation increase was successful or not necessary. bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; + /// Decrease reservation by 'bytes' on this tracker and all ancestors. This tracker's + /// reservation must be at least 'bytes' before calling this method. + void DecreaseReservation(int64_t bytes) { DecreaseReservation(bytes, false); } + /// Transfer reservation from this tracker to 'other'. Both trackers must be in the /// same query subtree of the hierarchy. One tracker can be the ancestor of the other, /// or they can share a common ancestor. The subtree root must be at the query level http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/disk-io-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc index 3393ab3..55042d8 100644 --- a/be/src/runtime/disk-io-mgr.cc +++ b/be/src/runtime/disk-io-mgr.cc @@ -83,7 +83,7 @@ DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads"); // not introduce seeks. The literature seems to agree that with 8 MB reads, random // io and sequential io perform similarly. DEFINE_int32(read_size, 8 * 1024 * 1024, "Read Size (in bytes)"); -DEFINE_int32(min_buffer_size, 1024, "The minimum read buffer size (in bytes)"); +DECLARE_int64(min_buffer_size); // With 1024B through 8MB buffers, this is up to ~2GB of buffers. DEFINE_int32(max_free_io_buffers, 128, @@ -937,9 +937,8 @@ void DiskIoMgr::HandleWriteFinished( int disk_id = write_range->disk_id_; // Execute the callback before decrementing the thread count. Otherwise CancelContext() - // that waits for the disk ref count to be 0 will return, creating a race, e.g. - // between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr(). - // See IMPALA-1890. + // that waits for the disk ref count to be 0 will return, creating a race, e.g. see + // IMPALA-1890. // The status of the write does not affect the status of the writer context. write_range->callback_(write_status); { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc index 960e3c9..f2ee6f0 100644 --- a/be/src/runtime/exec-env.cc +++ b/be/src/runtime/exec-env.cc @@ -75,6 +75,8 @@ DEFINE_int32(state_store_subscriber_port, 23000, DEFINE_int32(num_hdfs_worker_threads, 16, "(Advanced) The number of threads in the global HDFS operation pool"); DEFINE_bool(disable_admission_control, false, "Disables admission control."); +DEFINE_int64(min_buffer_size, 64 * 1024, + "(Advanced) The minimum buffer size to use in the buffer pool"); DECLARE_int32(state_store_port); DECLARE_int32(num_threads_per_core); @@ -204,13 +206,14 @@ Status ExecEnv::StartServices() { // memory limit either based on the available physical memory, or if overcommitting // is turned off, we use the memory commit limit from /proc/meminfo (see // IMPALA-1690). - // --mem_limit="" means no memory limit + // --mem_limit="" means no memory limit. TODO: IMPALA-5652: deprecate this mode int64_t bytes_limit = 0; bool is_percent; + int64_t system_mem; if (MemInfo::vm_overcommit() == 2 && MemInfo::commit_limit() < MemInfo::physical_mem()) { - bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, - MemInfo::commit_limit()); + system_mem = MemInfo::commit_limit(); + bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem); // There might be the case of misconfiguration, when on a system swap is disabled // and overcommitting is turned off the actual usable memory is less than the // available physical memory. @@ -225,14 +228,23 @@ Status ExecEnv::StartServices() { << "/proc/sys/vm/overcommit_memory and " << "/proc/sys/vm/overcommit_ratio."; } else { - bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, - MemInfo::physical_mem()); + system_mem = MemInfo::physical_mem(); + bytes_limit = ParseUtil::ParseMemSpec(FLAGS_mem_limit, &is_percent, system_mem); } - + // ParseMemSpec returns 0 to mean unlimited. TODO: IMPALA-5652: deprecate this mode. + bool no_process_mem_limit = bytes_limit == 0; if (bytes_limit < 0) { return Status("Failed to parse mem limit from '" + FLAGS_mem_limit + "'."); } + if (!BitUtil::IsPowerOf2(FLAGS_min_buffer_size)) { + return Status(Substitute( + "--min_buffer_size must be a power-of-two: $0", FLAGS_min_buffer_size)); + } + int64_t buffer_pool_capacity = BitUtil::RoundDown( + no_process_mem_limit ? system_mem : bytes_limit * 4 / 5, FLAGS_min_buffer_size); + InitBufferPool(FLAGS_min_buffer_size, buffer_pool_capacity); + metrics_->Init(enable_webserver_ ? webserver_.get() : nullptr); impalad_client_cache_->InitMetrics(metrics_.get(), "impala-server.backends"); catalogd_client_cache_->InitMetrics(metrics_.get(), "catalog.server"); @@ -240,8 +252,8 @@ Status ExecEnv::StartServices() { metrics_.get(), true, buffer_reservation_.get(), buffer_pool_.get())); // Limit of -1 means no memory limit. - mem_tracker_.reset(new MemTracker( - AggregateMemoryMetrics::TOTAL_USED, bytes_limit > 0 ? bytes_limit : -1, "Process")); + mem_tracker_.reset(new MemTracker(AggregateMemoryMetrics::TOTAL_USED, + no_process_mem_limit ? -1 : bytes_limit, "Process")); if (buffer_pool_ != nullptr) { // Add BufferPool MemTrackers for cached memory that is not tracked against queries // but is included in process memory consumption. @@ -270,6 +282,8 @@ Status ExecEnv::StartServices() { } LOG(INFO) << "Using global memory limit: " << PrettyPrinter::Print(bytes_limit, TUnit::BYTES); + LOG(INFO) << "Buffer pool capacity: " + << PrettyPrinter::Print(buffer_pool_capacity, TUnit::BYTES); RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get())); @@ -310,9 +324,8 @@ Status ExecEnv::StartServices() { return Status::OK(); } -void ExecEnv::InitBufferPool(int64_t min_page_size, int64_t capacity) { - DCHECK(buffer_pool_ == nullptr); - buffer_pool_.reset(new BufferPool(min_page_size, capacity)); +void ExecEnv::InitBufferPool(int64_t min_buffer_size, int64_t capacity) { + buffer_pool_.reset(new BufferPool(min_buffer_size, capacity)); buffer_reservation_.reset(new ReservationTracker()); buffer_reservation_->InitRootTracker(nullptr, capacity); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/exec-env.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/exec-env.h b/be/src/runtime/exec-env.h index 4674072..63d2e0b 100644 --- a/be/src/runtime/exec-env.h +++ b/be/src/runtime/exec-env.h @@ -159,8 +159,8 @@ class ExecEnv { boost::scoped_ptr<QueryExecMgr> query_exec_mgr_; /// Query-wide buffer pool and the root reservation tracker for the pool. The - /// reservation limit is equal to the maximum capacity of the pool. - /// For now this is only used by backend tests that create them via InitBufferPool(); + /// reservation limit is equal to the maximum capacity of the pool. Created in + /// InitBufferPool(); boost::scoped_ptr<ReservationTracker> buffer_reservation_; boost::scoped_ptr<BufferPool> buffer_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/fragment-instance-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc index 2385eab..07b3f1c 100644 --- a/be/src/runtime/fragment-instance-state.cc +++ b/be/src/runtime/fragment-instance-state.cc @@ -126,8 +126,6 @@ Status FragmentInstanceState::Prepare() { profile()->AddChild(timings_profile_); SCOPED_TIMER(ADD_TIMER(timings_profile_, PREPARE_TIMER_NAME)); - // TODO: move this into a RuntimeState::Init() - RETURN_IF_ERROR(runtime_state_->CreateBlockMgr()); runtime_state_->InitFilterBank(); // Reserve one main thread from the pool http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc new file mode 100644 index 0000000..4987ec3 --- /dev/null +++ b/be/src/runtime/initial-reservations.cc @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/initial-reservations.h" + +#include <limits> + +#include <boost/thread/mutex.hpp> +#include <gflags/gflags.h> + +#include "common/logging.h" +#include "common/object-pool.h" +#include "runtime/exec-env.h" +#include "runtime/mem-tracker.h" +#include "util/debug-util.h" + +#include "common/names.h" + +using std::numeric_limits; + +DECLARE_int32(be_port); +DECLARE_string(hostname); + +namespace impala { + +InitialReservations::InitialReservations(ObjectPool* obj_pool, + ReservationTracker* query_reservation, MemTracker* query_mem_tracker, + int64_t initial_reservation_total_claims) + : remaining_initial_reservation_claims_(initial_reservation_total_claims) { + MemTracker* initial_reservation_tracker = obj_pool->Add( + new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false)); + initial_reservations_.InitChildTracker(nullptr, query_reservation, + initial_reservation_tracker, numeric_limits<int64_t>::max()); +} + +Status InitialReservations::Init( + const TUniqueId& query_id, int64_t query_min_reservation) { + DCHECK_EQ(0, initial_reservations_.GetReservation()) << "Already inited"; + if (!initial_reservations_.IncreaseReservation(query_min_reservation)) { + return Status(TErrorCode::MINIMUM_RESERVATION_UNAVAILABLE, + PrettyPrinter::Print(query_min_reservation, TUnit::BYTES), FLAGS_hostname, + FLAGS_be_port, PrintId(query_id), + ExecEnv::GetInstance()->process_mem_tracker()->LogUsage()); + } + VLOG_QUERY << "Successfully claimed initial reservations (" + << PrettyPrinter::Print(query_min_reservation, TUnit::BYTES) << ") for" + << " query " << PrintId(query_id); + return Status::OK(); +} + +void InitialReservations::Claim(BufferPool::ClientHandle* dst, int64_t bytes) { + DCHECK_GE(bytes, 0); + lock_guard<SpinLock> l(lock_); + DCHECK_LE(bytes, remaining_initial_reservation_claims_); + bool success = dst->TransferReservationFrom(&initial_reservations_, bytes); + DCHECK(success) << "Planner computation should ensure enough initial reservations"; + remaining_initial_reservation_claims_ -= bytes; +} + +void InitialReservations::Return(BufferPool::ClientHandle* src, int64_t bytes) { + lock_guard<SpinLock> l(lock_); + bool success = src->TransferReservationTo(&initial_reservations_, bytes); + // No limits on our tracker - no way this should fail. + DCHECK(success); + // Check to see if we can release any reservation. + int64_t excess_reservation = + initial_reservations_.GetReservation() - remaining_initial_reservation_claims_; + if (excess_reservation > 0) { + initial_reservations_.DecreaseReservation(excess_reservation); + } +} + +void InitialReservations::ReleaseResources() { + initial_reservations_.Close(); +} +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/initial-reservations.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/initial-reservations.h b/be/src/runtime/initial-reservations.h new file mode 100644 index 0000000..dfcb114 --- /dev/null +++ b/be/src/runtime/initial-reservations.h @@ -0,0 +1,79 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef IMPALA_RUNTIME_INITIAL_RESERVATIONS_H +#define IMPALA_RUNTIME_INITIAL_RESERVATIONS_H + +#include "common/status.h" +#include "gen-cpp/Types_types.h" // for TUniqueId +#include "runtime/bufferpool/buffer-pool.h" +#include "runtime/bufferpool/reservation-tracker.h" +#include "util/spinlock.h" + +namespace impala { + +class ObjectPool; + +/** + * Manages the pool of initial reservations for different nodes in the plan tree. + * Each plan node and sink claims its initial reservation from here, then returns it when + * it is done executing. The frontend is responsible for making sure that enough initial + * reservation is in this pool for all of the concurrent claims. + */ +class InitialReservations { + public: + /// 'query_reservation' and 'query_mem_tracker' are the top-level trackers for the + /// query. This creates trackers for initial reservations under those. + /// 'initial_reservation_total_claims' is the total of initial reservations that will be + /// claimed over the lifetime of the query. The total bytes claimed via Claim() + /// cannot exceed this. Allocated objects are stored in 'obj_pool'. + InitialReservations(ObjectPool* obj_pool, ReservationTracker* query_reservation, + MemTracker* query_mem_tracker, int64_t initial_reservation_total_claims); + + /// Initialize the query's pool of initial reservations by acquiring the minimum + /// reservation required for the query on this host. Fails if the reservation could + /// not be acquired, e.g. because it would exceed a pool or process limit. + Status Init( + const TUniqueId& query_id, int64_t query_min_reservation) WARN_UNUSED_RESULT; + + /// Claim the initial reservation of 'bytes' for 'dst'. Assumes that the transfer will + /// not violate any reservation limits on 'dst'. + void Claim(BufferPool::ClientHandle* dst, int64_t bytes); + + /// Return the initial reservation of 'bytes' from 'src'. The reservation is returned + /// to the pool of reservations if it may be needed to satisfy a subsequent claim or + /// otherwise is released. + void Return(BufferPool::ClientHandle* src, int64_t bytes); + + /// Release any reservations held onto by this object. + void ReleaseResources(); + + private: + // Protects all below members to ensure that the internal state is consistent. + SpinLock lock_; + + // The pool of initial reservations that Claim() returns reservations from and + // Return() returns reservations to. + ReservationTracker initial_reservations_; + + /// The total bytes of additional reservations that we expect to be claimed. + /// initial_reservations_->GetReservation() <= remaining_initial_reservation_claims_. + int64_t remaining_initial_reservation_claims_; +}; +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-exec-mgr.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-exec-mgr.cc b/be/src/runtime/query-exec-mgr.cc index 6057b52..22c2826 100644 --- a/be/src/runtime/query-exec-mgr.cc +++ b/be/src/runtime/query-exec-mgr.cc @@ -124,6 +124,8 @@ void QueryExecMgr::StartQueryHelper(QueryState* qs) { } #endif + // decrement refcount taken in QueryState::Init(); + qs->ReleaseInitialReservationRefcount(); // decrement refcount taken in StartQuery() ReleaseQueryState(qs); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc index 21f35fb..64a8c5a 100644 --- a/be/src/runtime/query-state.cc +++ b/be/src/runtime/query-state.cc @@ -21,11 +21,12 @@ #include <boost/thread/locks.hpp> #include "exprs/expr.h" +#include "runtime/backend-client.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" -#include "runtime/backend-client.h" #include "runtime/exec-env.h" #include "runtime/fragment-instance-state.h" +#include "runtime/initial-reservations.h" #include "runtime/mem-tracker.h" #include "runtime/query-exec-mgr.h" #include "runtime/runtime-state.h" @@ -37,6 +38,20 @@ using namespace impala; +// The fraction of the query mem limit that is used for buffer reservations. Most +// operators that accumulate memory use reservations, so the majority of memory should +// be allocated to buffer reservations, as a heuristic. +// TODO: this will go away once all operators use buffer reservations. +static const double RESERVATION_MEM_FRACTION = 0.8; + +// The minimum amount of memory that should be left after buffer reservations. +// The limit on reservations is computed as: +// min(query_limit * RESERVATION_MEM_FRACTION, +// query_limit - RESERVATION_MEM_MIN_REMAINING) +// TODO: this will go away once all operators use buffer reservations and we have accurate +// minimum requirements. +static const int64_t RESERVATION_MEM_MIN_REMAINING = 100 * 1024 * 1024; + QueryState::ScopedRef::ScopedRef(const TUniqueId& query_id) { DCHECK(ExecEnv::GetInstance()->query_exec_mgr() != nullptr); query_state_ = ExecEnv::GetInstance()->query_exec_mgr()->GetQueryState(query_id); @@ -49,8 +64,10 @@ QueryState::ScopedRef::~ScopedRef() { QueryState::QueryState(const TQueryCtx& query_ctx, const string& request_pool) : query_ctx_(query_ctx), + initial_reservation_refcnt_(0), refcnt_(0), - is_cancelled_(0) { + is_cancelled_(0), + query_spilled_(0) { if (query_ctx_.request_pool.empty()) { // fix up pool name for tests DCHECK(!request_pool.empty()); @@ -75,6 +92,7 @@ void QueryState::ReleaseResources() { // Clean up temporary files. if (file_group_ != nullptr) file_group_->Close(); // Release any remaining reservation. + if (initial_reservations_ != nullptr) initial_reservations_->ReleaseResources(); if (buffer_reservation_ != nullptr) buffer_reservation_->Close(); // Avoid dangling reference from the parent of 'query_mem_tracker_'. if (query_mem_tracker_ != nullptr) query_mem_tracker_->UnregisterFromParent(); @@ -85,6 +103,7 @@ void QueryState::ReleaseResources() { QueryState::~QueryState() { DCHECK(released_resources_); DCHECK_EQ(refcnt_.Load(), 0); + DCHECK_EQ(initial_reservation_refcnt_.Load(), 0); } Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) { @@ -99,9 +118,8 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) { "is over its memory limit", PrintId(query_id())); RETURN_IF_ERROR(process_mem_tracker->MemLimitExceeded(NULL, msg, 0)); } - // Do buffer-pool-related setup if running in a backend test that explicitly created - // the pool. - if (exec_env->buffer_pool() != nullptr) RETURN_IF_ERROR(InitBufferPoolState()); + + RETURN_IF_ERROR(InitBufferPoolState()); // don't copy query_ctx, it's large and we already did that in the c'tor rpc_params_.__set_coord_state_idx(rpc_params.coord_state_idx); @@ -112,6 +130,15 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) { rpc_params_.fragment_instance_ctxs.swap(non_const_params.fragment_instance_ctxs); rpc_params_.__isset.fragment_instance_ctxs = true; + // Claim the query-wide minimum reservation. Do this last so that we don't need + // to handle releasing it if a later step fails. + initial_reservations_ = obj_pool_.Add(new InitialReservations(&obj_pool_, + buffer_reservation_, query_mem_tracker_, + query_ctx_.per_host_initial_reservation_total_claims)); + RETURN_IF_ERROR( + initial_reservations_->Init(query_id(), query_ctx_.per_host_min_reservation)); + DCHECK_EQ(0, initial_reservation_refcnt_.Load()); + initial_reservation_refcnt_.Add(1); // Decremented in QueryExecMgr::StartQueryHelper(). return Status::OK(); } @@ -129,19 +156,23 @@ void QueryState::InitMemTrackers() { Status QueryState::InitBufferPoolState() { ExecEnv* exec_env = ExecEnv::GetInstance(); - int64_t query_mem_limit = query_mem_tracker_->limit(); - if (query_mem_limit == -1) query_mem_limit = numeric_limits<int64_t>::max(); - - // TODO: IMPALA-3200: add a default upper bound to buffer pool memory derived from - // query_mem_limit. - int64_t max_reservation = numeric_limits<int64_t>::max(); - if (query_options().__isset.max_block_mgr_memory - && query_options().max_block_mgr_memory > 0) { - max_reservation = query_options().max_block_mgr_memory; + int64_t mem_limit = query_mem_tracker_->lowest_limit(); + int64_t max_reservation; + if (query_options().__isset.buffer_pool_limit + && query_options().buffer_pool_limit > 0) { + max_reservation = query_options().buffer_pool_limit; + } else if (mem_limit == -1) { + // No query mem limit. The process-wide reservation limit is the only limit on + // reservations. + max_reservation = numeric_limits<int64_t>::max(); + } else { + DCHECK_GE(mem_limit, 0); + max_reservation = min<int64_t>( + mem_limit * RESERVATION_MEM_FRACTION, mem_limit - RESERVATION_MEM_MIN_REMAINING); + max_reservation = max<int64_t>(0, max_reservation); } + VLOG_QUERY << "Buffer pool limit for " << PrintId(query_id()) << ": " << max_reservation; - // TODO: IMPALA-3748: claim the query-wide minimum reservation. - // For now, rely on exec nodes to grab their minimum reservation during Prepare(). buffer_reservation_ = obj_pool_.Add(new ReservationTracker); buffer_reservation_->InitChildTracker( NULL, exec_env->buffer_reservation(), query_mem_tracker_, max_reservation); @@ -256,6 +287,7 @@ void QueryState::StartFInstances() { VLOG_QUERY << "StartFInstances(): query_id=" << PrintId(query_id()) << " #instances=" << rpc_params_.fragment_instance_ctxs.size(); DCHECK_GT(refcnt_.Load(), 0); + DCHECK_GT(initial_reservation_refcnt_.Load(), 0) << "Should have been taken in Init()"; // set up desc tbl DCHECK(query_ctx().__isset.desc_tbl); @@ -290,6 +322,7 @@ void QueryState::StartFInstances() { // start new thread to execute instance refcnt_.Add(1); // decremented in ExecFInstance() + initial_reservation_refcnt_.Add(1); // decremented in ExecFInstance() string thread_name = Substitute( "exec-finstance (finst:$0)", PrintId(instance_ctx.fragment_instance_id)); Thread t(FragmentInstanceState::FINST_THREAD_GROUP_NAME, thread_name, @@ -311,6 +344,12 @@ void QueryState::StartFInstances() { instances_prepared_promise_.Set(prepare_status); } +void QueryState::ReleaseInitialReservationRefcount() { + int32_t new_val = initial_reservation_refcnt_.Add(-1); + DCHECK_GE(new_val, 0); + if (new_val == 0) initial_reservations_->ReleaseResources(); +} + void QueryState::ExecFInstance(FragmentInstanceState* fis) { ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS_IN_FLIGHT->Increment(1L); ImpaladMetrics::IMPALA_SERVER_NUM_FRAGMENTS->Increment(1L); @@ -327,6 +366,8 @@ void QueryState::ExecFInstance(FragmentInstanceState* fis) { // initiate cancellation if nobody has done so yet if (!status.ok()) Cancel(); // decrement refcount taken in StartFInstances() + ReleaseInitialReservationRefcount(); + // decrement refcount taken in StartFInstances() ExecEnv::GetInstance()->query_exec_mgr()->ReleaseQueryState(this); } @@ -345,3 +386,21 @@ void QueryState::PublishFilter(int32_t filter_id, int fragment_idx, fis->PublishFilter(filter_id, thrift_bloom_filter); } } + +Status QueryState::StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker) { + // Return an error message with the root cause of why spilling is disabled. + if (query_options().scratch_limit == 0) { + return mem_tracker->MemLimitExceeded( + runtime_state, "Could not free memory by spilling to disk: scratch_limit is 0"); + } else if (query_ctx_.disable_spilling) { + return mem_tracker->MemLimitExceeded(runtime_state, + "Could not free memory by spilling to disk: spilling was disabled by planner. " + "Re-enable spilling by setting the query option DISABLE_UNSAFE_SPILLS=false"); + } + // 'file_group_' must be non-NULL for spilling to be enabled. + DCHECK(file_group_ != nullptr); + if (query_spilled_.CompareAndSwap(0, 1)) { + ImpaladMetrics::NUM_QUERIES_SPILLED->Increment(1); + } + return Status::OK(); +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/query-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/query-state.h b/be/src/runtime/query-state.h index 9ce4316..fc71772 100644 --- a/be/src/runtime/query-state.h +++ b/be/src/runtime/query-state.h @@ -34,8 +34,10 @@ namespace impala { class FragmentInstanceState; +class InitialReservations; class MemTracker; class ReservationTracker; +class RuntimeState; /// Central class for all backend execution state (example: the FragmentInstanceStates /// of the individual fragment instances) created for a particular query. @@ -110,6 +112,7 @@ class QueryState { // the following getters are only valid after Prepare() ReservationTracker* buffer_reservation() const { return buffer_reservation_; } + InitialReservations* initial_reservations() const { return initial_reservations_; } TmpFileMgr::FileGroup* file_group() const { return file_group_; } const TExecQueryFInstancesParams& rpc_params() const { return rpc_params_; } @@ -117,8 +120,10 @@ class QueryState { const DescriptorTbl& desc_tbl() const { return *desc_tbl_; } /// Sets up state required for fragment execution: memory reservations, etc. Fails - /// if resources could not be acquired. Uses few cycles and never blocks. - /// Not idempotent, not thread-safe. + /// if resources could not be acquired. On success, acquires an initial reservation + /// refcount for the caller, which the caller must release by calling + /// ReleaseInitialReservationRefcount(). + /// Uses few cycles and never blocks. Not idempotent, not thread-safe. /// The remaining public functions must be called only after Init(). Status Init(const TExecQueryFInstancesParams& rpc_params) WARN_UNUSED_RESULT; @@ -155,6 +160,12 @@ class QueryState { /// If there is an error during the rpc, initiates cancellation. void ReportExecStatus(bool done, const Status& status, FragmentInstanceState* fis); + /// Checks whether spilling is enabled for this query. Must be called before the first + /// call to BufferPool::Unpin() for the query. Returns OK if spilling is enabled. If + /// spilling is not enabled, logs a MEM_LIMIT_EXCEEDED error from + /// tracker->MemLimitExceeded() to 'runtime_state'. + Status StartSpilling(RuntimeState* runtime_state, MemTracker* mem_tracker); + ~QueryState(); private: @@ -162,6 +173,7 @@ class QueryState { /// test execution friend class RuntimeState; + friend class TestEnv; static const int DEFAULT_BATCH_SIZE = 1024; @@ -176,16 +188,21 @@ class QueryState { /// TODO: find a way not to have to copy this TExecQueryFInstancesParams rpc_params_; - /// Buffer reservation for this query (owned by obj_pool_) - /// Only non-null in backend tests that explicitly enabled the new buffer pool - /// Set in Prepare(). - /// TODO: this will always be non-null once IMPALA-3200 is done + /// Buffer reservation for this query (owned by obj_pool_). Set in Prepare(). ReservationTracker* buffer_reservation_ = nullptr; - /// Temporary files for this query (owned by obj_pool_) - /// Only non-null in backend tests the explicitly enabled the new buffer pool - /// Set in Prepare(). - /// TODO: this will always be non-null once IMPALA-3200 is done + /// Pool of buffer reservations used to distribute initial reservations to operators + /// in the query. Contains a ReservationTracker that is a child of + /// 'buffer_reservation_'. Owned by 'obj_pool_'. Set in Prepare(). + InitialReservations* initial_reservations_ = nullptr; + + /// Number of fragment instances executing, which may need to claim + /// from 'initial_reservations_'. + /// TODO: not needed if we call ReleaseResources() in a timely manner (IMPALA-1575). + AtomicInt32 initial_reservation_refcnt_; + + /// Temporary files for this query (owned by obj_pool_). Non-null if spilling is + /// enabled. Set in Prepare(). TmpFileMgr::FileGroup* file_group_ = nullptr; /// created in StartFInstances(), owned by obj_pool_ @@ -214,6 +231,11 @@ class QueryState { /// True if and only if ReleaseResources() has been called. bool released_resources_ = false; + /// Whether the query has spilled. 0 if the query has not spilled. Atomically set to 1 + /// when the query first starts to spill. Required to correctly maintain the + /// "num-queries-spilled" metric. + AtomicInt32 query_spilled_; + /// Create QueryState w/ refcnt of 0. /// The query is associated with the resource pool query_ctx.request_pool or /// 'request_pool', if the former is not set (needed for tests). @@ -222,13 +244,16 @@ class QueryState { /// Execute the fragment instance and decrement the refcnt when done. void ExecFInstance(FragmentInstanceState* fis); - /// Called from Prepare() to initialize MemTrackers. + /// Called from constructor to initialize MemTrackers. void InitMemTrackers(); - /// Called from Prepare() to setup buffer reservations and the - /// file group. Fails if required resources are not available. + /// Called from Init() to set up buffer reservations and the file group. Status InitBufferPoolState() WARN_UNUSED_RESULT; + /// Decrement 'initial_reservation_refcnt_' and release the initial reservation if it + /// goes to zero. + void ReleaseInitialReservationRefcount(); + /// Same behavior as ReportExecStatus(). /// Cancel on error only if instances_started is true. void ReportExecStatusAux(bool done, const Status& status, FragmentInstanceState* fis, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index 11cf363..942ac05 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -147,9 +147,6 @@ RowBatch::~RowBatch() { for (int i = 0; i < io_buffers_.size(); ++i) { ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); } - for (int i = 0; i < blocks_.size(); ++i) { - blocks_[i]->Delete(); - } for (BufferInfo& buffer_info : buffers_) { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); @@ -295,14 +292,6 @@ void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) { io_buffers_.emplace_back(move(buffer)); } -void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) { - DCHECK(block != NULL); - DCHECK(block->is_pinned()); - blocks_.push_back(block); - auxiliary_mem_usage_ += block->buffer_len(); - if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources(); -} - void RowBatch::AddBuffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, FlushMode flush) { auxiliary_mem_usage_ += buffer.len(); @@ -322,10 +311,6 @@ void RowBatch::Reset() { ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); } io_buffers_.clear(); - for (int i = 0; i < blocks_.size(); ++i) { - blocks_[i]->Delete(); - } - blocks_.clear(); for (BufferInfo& buffer_info : buffers_) { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); @@ -342,10 +327,6 @@ void RowBatch::TransferResourceOwnership(RowBatch* dest) { dest->AddIoBuffer(move(io_buffers_[i])); } io_buffers_.clear(); - for (int i = 0; i < blocks_.size(); ++i) { - dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES); - } - blocks_.clear(); for (BufferInfo& buffer_info : buffers_) { dest->AddBuffer( buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 1b75ebb..35a8f14 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -25,7 +25,6 @@ #include "codegen/impala-ir.h" #include "common/compiler-util.h" #include "common/logging.h" -#include "runtime/buffered-block-mgr.h" #include "runtime/bufferpool/buffer-pool.h" #include "runtime/descriptors.h" #include "runtime/disk-io-mgr.h" @@ -207,7 +206,6 @@ class RowBatch { int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); } MemPool* tuple_data_pool() { return &tuple_data_pool_; } int num_io_buffers() const { return io_buffers_.size(); } - int num_blocks() const { return blocks_.size(); } int num_buffers() const { return buffers_.size(); } /// Resets the row batch, returning all resources it has accumulated. @@ -216,13 +214,6 @@ class RowBatch { /// Add io buffer to this row batch. void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer); - /// Adds a block to this row batch. The block must be pinned. The blocks must be - /// deleted when freeing resources. The block's memory remains accounted against - /// the original owner, even when the ownership of batches is transferred. If the - /// original owner wants the memory to be released, it should call this with 'mode' - /// FLUSH_RESOURCES (see MarkFlushResources() for further explanation). - void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush); - /// Adds a buffer to this row batch. The buffer is deleted when freeing resources. /// The buffer's memory remains accounted against the original owner, even when the /// ownership of batches is transferred. If the original owner wants the memory to be @@ -426,10 +417,6 @@ class RowBatch { /// (i.e. they are not ref counted) so most row batches don't own any. std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_; - /// Blocks attached to this row batch. The underlying memory and block manager client - /// are owned by the BufferedBlockMgr. - std::vector<BufferedBlockMgr::Block*> blocks_; - struct BufferInfo { BufferPool::ClientHandle* client; BufferPool::BufferHandle buffer; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-filter.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-filter.h b/be/src/runtime/runtime-filter.h index ab70d4a..7b6066a 100644 --- a/be/src/runtime/runtime-filter.h +++ b/be/src/runtime/runtime-filter.h @@ -23,6 +23,7 @@ #include "runtime/runtime-filter-bank.h" #include "util/bloom-filter.h" #include "util/spinlock.h" +#include "util/time.h" namespace impala { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc index 89eec29..ba8e75d 100644 --- a/be/src/runtime/runtime-state.cc +++ b/be/src/runtime/runtime-state.cc @@ -17,21 +17,21 @@ #include "runtime/runtime-state.h" -#include <iostream> #include <jni.h> +#include <iostream> #include <sstream> #include <string> -#include "common/logging.h" #include <boost/algorithm/string/join.hpp> #include <gutil/strings/substitute.h> +#include "common/logging.h" #include "codegen/llvm-codegen.h" #include "common/object-pool.h" #include "common/status.h" #include "exprs/scalar-expr.h" #include "exprs/scalar-fn-call.h" -#include "runtime/buffered-block-mgr.h" +#include "runtime/bufferpool/buffer-pool.h" #include "runtime/bufferpool/reservation-tracker.h" #include "runtime/data-stream-mgr.h" #include "runtime/data-stream-recvr.h" @@ -54,22 +54,10 @@ #include "common/names.h" using namespace llvm; +using strings::Substitute; DECLARE_int32(max_errors); -// The fraction of the query mem limit that is used for the block mgr. Operators -// that accumulate memory all use the block mgr so the majority of the memory should -// be allocated to the block mgr. The remaining memory is used by the non-spilling -// operators and should be independent of data size. -static const float BLOCK_MGR_MEM_FRACTION = 0.8f; - -// The minimum amount of memory that must be left after the block mgr reserves the -// BLOCK_MGR_MEM_FRACTION. The block limit is: -// min(query_limit * BLOCK_MGR_MEM_FRACTION, query_limit - BLOCK_MGR_MEM_MIN_REMAINING) -// TODO: this value was picked arbitrarily and the tests are written to rely on this -// for the minimum memory required to run the query. Revisit. -static const int64_t BLOCK_MGR_MEM_MIN_REMAINING = 100 * 1024 * 1024; - namespace impala { RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& fragment_ctx, @@ -82,7 +70,7 @@ RuntimeState::RuntimeState(QueryState* query_state, const TPlanFragmentCtx& frag query_state->query_ctx().utc_timestamp_string))), exec_env_(exec_env), profile_(obj_pool(), "Fragment " + PrintId(instance_ctx.fragment_instance_id)), - instance_buffer_reservation_(nullptr), + instance_buffer_reservation_(new ReservationTracker), is_cancelled_(false), root_node_id_(-1) { Init(); @@ -127,8 +115,7 @@ void RuntimeState::Init() { instance_mem_tracker_.reset(new MemTracker( runtime_profile(), -1, runtime_profile()->name(), query_mem_tracker())); - if (query_state_ != nullptr && exec_env_->buffer_pool() != nullptr) { - instance_buffer_reservation_ = obj_pool()->Add(new ReservationTracker); + if (instance_buffer_reservation_ != nullptr) { instance_buffer_reservation_->InitChildTracker(&profile_, query_state_->buffer_reservation(), instance_mem_tracker_.get(), numeric_limits<int64_t>::max()); @@ -139,28 +126,6 @@ void RuntimeState::InitFilterBank() { filter_bank_.reset(new RuntimeFilterBank(query_ctx(), this)); } -Status RuntimeState::CreateBlockMgr() { - DCHECK(block_mgr_.get() == NULL); - - // Compute the max memory the block mgr will use. - int64_t block_mgr_limit = query_mem_tracker()->lowest_limit(); - if (block_mgr_limit < 0) block_mgr_limit = numeric_limits<int64_t>::max(); - block_mgr_limit = min(static_cast<int64_t>(block_mgr_limit * BLOCK_MGR_MEM_FRACTION), - block_mgr_limit - BLOCK_MGR_MEM_MIN_REMAINING); - if (block_mgr_limit < 0) block_mgr_limit = 0; - if (query_options().__isset.max_block_mgr_memory && - query_options().max_block_mgr_memory > 0) { - block_mgr_limit = query_options().max_block_mgr_memory; - LOG(WARNING) << "Block mgr mem limit: " - << PrettyPrinter::Print(block_mgr_limit, TUnit::BYTES); - } - - RETURN_IF_ERROR(BufferedBlockMgr::Create(this, query_mem_tracker(), - runtime_profile(), exec_env()->tmp_file_mgr(), block_mgr_limit, - io_mgr()->max_read_buffer_size(), &block_mgr_)); - return Status::OK(); -} - Status RuntimeState::CreateCodegen() { if (codegen_.get() != NULL) return Status::OK(); // TODO: add the fragment ID to the codegen ID as well @@ -179,6 +144,10 @@ Status RuntimeState::CodegenScalarFns() { return Status::OK(); } +Status RuntimeState::StartSpilling(MemTracker* mem_tracker) { + return query_state_->StartSpilling(this, mem_tracker); +} + string RuntimeState::ErrorLog() { lock_guard<SpinLock> l(error_log_lock_); return PrintErrorMapToString(error_log_); @@ -270,7 +239,6 @@ void RuntimeState::ReleaseResources() { if (resource_pool_ != nullptr) { exec_env_->thread_mgr()->UnregisterPool(resource_pool_); } - block_mgr_.reset(); // Release any block mgr memory, if this is the last reference. codegen_.reset(); // Release any memory associated with codegen. // Release the reservation, which should be unused at the point. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/a98b90bd/be/src/runtime/runtime-state.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/runtime-state.h b/be/src/runtime/runtime-state.h index 9a1d0b2..12e7d8c 100644 --- a/be/src/runtime/runtime-state.h +++ b/be/src/runtime/runtime-state.h @@ -32,7 +32,7 @@ namespace impala { -class BufferedBlockMgr; +class BufferPool; class DataStreamRecvr; class DescriptorTbl; class DiskIoMgr; @@ -92,9 +92,6 @@ class RuntimeState { /// Initializes the runtime filter bank. void InitFilterBank(); - /// Gets/Creates the query wide block mgr. - Status CreateBlockMgr(); - QueryState* query_state() const { return query_state_; } /// Return the query's ObjectPool ObjectPool* obj_pool() const; @@ -132,7 +129,7 @@ class RuntimeState { MemTracker* instance_mem_tracker() { return instance_mem_tracker_.get(); } MemTracker* query_mem_tracker(); // reference to the query_state_'s memtracker ReservationTracker* instance_buffer_reservation() { - return instance_buffer_reservation_; + return instance_buffer_reservation_.get(); } ThreadResourceMgr::ResourcePool* resource_pool() { return resource_pool_; } @@ -206,11 +203,6 @@ class RuntimeState { /// Unregisters all reader contexts acquired through AcquireReaderContext(). void UnregisterReaderContexts(); - BufferedBlockMgr* block_mgr() { - DCHECK(block_mgr_.get() != NULL); - return block_mgr_.get(); - } - inline Status GetQueryStatus() { // Do a racy check for query_status_ to avoid unnecessary spinlock acquisition. if (UNLIKELY(!query_status_.ok())) { @@ -307,21 +299,19 @@ class RuntimeState { /// TODO: Fix IMPALA-4233 Status CodegenScalarFns(); + /// Helper to call QueryState::StartSpilling(). + Status StartSpilling(MemTracker* mem_tracker); + /// Release resources and prepare this object for destruction. void ReleaseResources(); private: - /// Allow TestEnv to set block_mgr manually for testing. + /// Allow TestEnv to use private methods for testing. friend class TestEnv; /// Set per-fragment state. void Init(); - /// Use a custom block manager for the query for testing purposes. - void set_block_mgr(const std::shared_ptr<BufferedBlockMgr>& block_mgr) { - block_mgr_ = block_mgr; - } - /// Lock protecting error_log_ SpinLock error_log_lock_; @@ -382,9 +372,8 @@ class RuntimeState { boost::scoped_ptr<MemTracker> instance_mem_tracker_; /// Buffer reservation for this fragment instance - a child of the query buffer - /// reservation. Non-NULL if 'query_state_' is not NULL and ExecEnv::buffer_pool_ - /// was created by a backend test. Owned by obj_pool(). - ReservationTracker* instance_buffer_reservation_; + /// reservation. Non-NULL if 'query_state_' is not NULL. + boost::scoped_ptr<ReservationTracker> instance_buffer_reservation_; /// if true, execution should stop with a CANCELLED status bool is_cancelled_; @@ -401,11 +390,6 @@ class RuntimeState { SpinLock reader_contexts_lock_; std::vector<DiskIoRequestContext*> reader_contexts_; - /// BufferedBlockMgr object used to allocate and manage blocks of input data in memory - /// with a fixed memory budget. - /// The block mgr is shared by all fragments for this query. - std::shared_ptr<BufferedBlockMgr> block_mgr_; - /// This is the node id of the root node for this plan fragment. This is used as the /// hash seed and has two useful properties: /// 1) It is the same for all exec nodes in a fragment, so the resulting hash values
