IMPALA-5307: Part 4: copy out uncompressed text and seq This is the final patch for IMPALA-5307.
Text and Seq scanners are converted to use the same approach as Avro. contains_tuple_data is now false so a bunch of dead code in ScannerContext can be removed. We also no longer attach I/O buffers to row batches so that logic can be removed. Testing: Ran core ASAN tests. Perf: I reran the same benchmarks as in Part 2. There was no measurable difference before and after - for both text and seq processing time is dominated by text parsing. Change-Id: I304fd002b61bfedf41c8b1405cd7eb7b492bb941 Reviewed-on: http://gerrit.cloudera.org:8080/8172 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/19c17e64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/19c17e64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/19c17e64 Branch: refs/heads/master Commit: 19c17e64b54fcc0a599eadaa14d35943ee703ed4 Parents: 73d1bc3 Author: Tim Armstrong <[email protected]> Authored: Tue Sep 26 15:27:30 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Wed Nov 8 10:09:25 2017 +0000 ---------------------------------------------------------------------- be/src/codegen/gen_ir_descriptions.py | 2 +- be/src/exec/base-sequence-scanner.cc | 6 +-- be/src/exec/exec-node.cc | 9 +---- be/src/exec/exec-node.h | 8 +--- be/src/exec/hdfs-avro-scanner-ir.cc | 4 +- be/src/exec/hdfs-avro-scanner.cc | 5 --- be/src/exec/hdfs-avro-scanner.h | 6 --- be/src/exec/hdfs-parquet-scanner.cc | 9 ++--- be/src/exec/hdfs-rcfile-scanner.cc | 4 -- be/src/exec/hdfs-scan-node-base.h | 6 --- be/src/exec/hdfs-scan-node-mt.cc | 2 - be/src/exec/hdfs-scan-node.cc | 9 +---- be/src/exec/hdfs-scanner-ir.cc | 16 ++++++-- be/src/exec/hdfs-scanner.cc | 20 +++++++++- be/src/exec/hdfs-scanner.h | 27 +++++++++----- be/src/exec/hdfs-sequence-scanner.cc | 22 ++++++++--- be/src/exec/hdfs-text-scanner.cc | 28 ++++++-------- be/src/exec/parquet-column-readers.cc | 2 +- be/src/exec/scanner-context.cc | 32 +++------------- be/src/exec/scanner-context.h | 47 +++++++++-------------- be/src/exec/text-converter.cc | 4 +- be/src/runtime/row-batch.cc | 28 +++----------- be/src/runtime/row-batch.h | 60 +++++++++++++----------------- 23 files changed, 144 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/codegen/gen_ir_descriptions.py ---------------------------------------------------------------------- diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py index 28e0a71..b3ad25d 100755 --- a/be/src/codegen/gen_ir_descriptions.py +++ b/be/src/codegen/gen_ir_descriptions.py @@ -174,7 +174,7 @@ ir_functions = [ ["HDFS_SCANNER_INIT_TUPLE", "_ZN6impala11HdfsScanner9InitTupleEPNS_5TupleES2_"], ["HDFS_SCANNER_WRITE_ALIGNED_TUPLES", - "_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEiPNS_13FieldLocationEiiii"], + "_ZN6impala11HdfsScanner18WriteAlignedTuplesEPNS_7MemPoolEPNS_8TupleRowEPNS_13FieldLocationEiiiib"], ["PROCESS_SCRATCH_BATCH", "_ZN6impala18HdfsParquetScanner19ProcessScratchBatchEPNS_8RowBatchE"], ["PARQUET_SCANNER_EVAL_RUNTIME_FILTER", http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index c22f18d..fcf58c6 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -98,9 +98,6 @@ Status BaseSequenceScanner::Open(ScannerContext* context) { only_parsing_header_ = true; return Status::OK(); } - - // If the file is compressed, the buffers in the stream_ are not used directly. - if (header_->is_compressed) stream_->set_contains_tuple_data(false); RETURN_IF_ERROR(InitNewRange()); // Skip to the first record @@ -128,7 +125,6 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) { if (row_batch != nullptr) { row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); - context_->ReleaseCompletedResources(row_batch, true); if (scan_node_->HasRowBatchQueue()) { static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch( unique_ptr<RowBatch>(row_batch)); @@ -136,8 +132,8 @@ void BaseSequenceScanner::Close(RowBatch* row_batch) { } else { data_buffer_pool_->FreeAll(); template_tuple_pool_->FreeAll(); - context_->ReleaseCompletedResources(nullptr, true); } + context_->ReleaseCompletedResources(true); // Verify all resources (if any) have been transferred. DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index aaca8be..afb6249 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -103,21 +103,14 @@ unique_ptr<RowBatch> ExecNode::RowBatchQueue::GetBatch() { return unique_ptr<RowBatch>(); } -int ExecNode::RowBatchQueue::Cleanup() { - int num_io_buffers = 0; - +void ExecNode::RowBatchQueue::Cleanup() { unique_ptr<RowBatch> batch = NULL; while ((batch = GetBatch()) != NULL) { - num_io_buffers += batch->num_io_buffers(); batch.reset(); } lock_guard<SpinLock> l(lock_); - for (const unique_ptr<RowBatch>& row_batch: cleanup_queue_) { - num_io_buffers += row_batch->num_io_buffers(); - } cleanup_queue_.clear(); - return num_io_buffers; } ExecNode::ExecNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/exec-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index a0ed352..187d9a7 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -81,7 +81,6 @@ class ExecNode { virtual void Codegen(RuntimeState* state); /// Performs any preparatory work prior to calling GetNext(). - /// Caller must not be holding any io buffers. This will cause deadlock. /// If overridden in subclass, must first call superclass's Open(). /// Open() is called after Prepare() or Reset(), i.e., possibly multiple times /// throughout the lifetime of this node. @@ -91,7 +90,7 @@ class ExecNode { /// child before acquiring their own resources to reduce the peak resource requirement. /// This is particularly important if there are multiple blocking ExecNodes in a /// pipeline because the lower nodes will release resources in Close() before the - /// Open() of their parent retuns. The resource profile calculation in the frontend + /// Open() of their parent returns. The resource profile calculation in the frontend /// relies on this when computing the peak resources required for a query. virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; @@ -106,8 +105,6 @@ class ExecNode { /// In other words, if the memory holding the tuple data will be referenced /// by the callee in subsequent GetNext() calls, it must *not* be attached to the /// row_batch's tuple_data_pool. - /// Caller must not be holding any io buffers. This will cause deadlock. - /// TODO: AggregationNode and HashJoinNode cannot be "re-opened" yet. virtual Status GetNext( RuntimeState* state, RowBatch* row_batch, bool* eos) WARN_UNUSED_RESULT = 0; @@ -274,8 +271,7 @@ class ExecNode { /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch() /// after this is called. - /// Returns the number of io buffers that were released (for debug tracking) - int Cleanup(); + void Cleanup(); private: /// Lock protecting cleanup_queue_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner-ir.cc b/be/src/exec/hdfs-avro-scanner-ir.cc index a2bf606..ffd5774 100644 --- a/be/src/exec/hdfs-avro-scanner-ir.cc +++ b/be/src/exec/hdfs-avro-scanner-ir.cc @@ -37,7 +37,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat // If the file is uncompressed, StringValues will have pointers into the I/O buffers. // We don't attach I/O buffers to output batches so need to copy out data referenced // by tuples that survive conjunct evaluation. - const bool copy_out_strings = !header_->is_compressed && !string_slot_offsets_.empty(); + const bool copy_strings = !header_->is_compressed && !string_slot_offsets_.empty(); int num_to_commit = 0; for (int i = 0; i < max_tuples; ++i) { InitTuple(template_tuple_, tuple); @@ -47,7 +47,7 @@ int HdfsAvroScanner::DecodeAvroData(int max_tuples, MemPool* pool, uint8_t** dat } tuple_row->SetTuple(scan_node_->tuple_idx(), tuple); if (EvalConjuncts(tuple_row)) { - if (copy_out_strings) { + if (copy_strings) { if (UNLIKELY(!tuple->CopyStrings("HdfsAvroScanner::DecodeAvroData()", state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool, &parse_status_))) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.cc b/be/src/exec/hdfs-avro-scanner.cc index cad74e6..dea51a9 100644 --- a/be/src/exec/hdfs-avro-scanner.cc +++ b/be/src/exec/hdfs-avro-scanner.cc @@ -66,10 +66,6 @@ static Status CheckSchema(const AvroSchemaElement& avro_schema) { HdfsAvroScanner::HdfsAvroScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : BaseSequenceScanner(scan_node, state) { - for (SlotDescriptor* string_slot : scan_node_->tuple_desc()->string_slots()) { - string_slot_offsets_.push_back( - {string_slot->null_indicator_offset(), string_slot->tuple_offset()}); - } } HdfsAvroScanner::HdfsAvroScanner() @@ -80,7 +76,6 @@ HdfsAvroScanner::HdfsAvroScanner() Status HdfsAvroScanner::Open(ScannerContext* context) { RETURN_IF_ERROR(BaseSequenceScanner::Open(context)); RETURN_IF_ERROR(CheckSchema(scan_node_->avro_schema())); - stream_->set_contains_tuple_data(false); // Avro scanner always copies out data. return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-avro-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-avro-scanner.h b/be/src/exec/hdfs-avro-scanner.h index ab5746a..e2260a6 100644 --- a/be/src/exec/hdfs-avro-scanner.h +++ b/be/src/exec/hdfs-avro-scanner.h @@ -130,12 +130,6 @@ class HdfsAvroScanner : public BaseSequenceScanner { bool use_codegend_decode_avro_data; }; - /// Offsets of string slots in the result tuple that may need to be copied as part of - /// tuple materialization. Populated in constructor. This is redundant with offset - /// information stored in the TupleDescriptor but storing only the required metadata - /// in a simple array of struct simplifies codegen and speeds up interpretation. - std::vector<SlotOffsets> string_slot_offsets_; - AvroFileHeader* avro_header_ = nullptr; /// Current data block after decompression with its end and length. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index e35f64a..2897287 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -164,7 +164,6 @@ HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState Status HdfsParquetScanner::Open(ScannerContext* context) { RETURN_IF_ERROR(HdfsScanner::Open(context)); - stream_->set_contains_tuple_data(false); metadata_range_ = stream_->scan_range(); num_cols_counter_ = ADD_COUNTER(scan_node_->runtime_profile(), "NumColumns", TUnit::UNIT); @@ -228,7 +227,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { // Release I/O buffers immediately to make sure they are cleaned up // in case we return a non-OK status anywhere below. - context_->ReleaseCompletedResources(nullptr, true); + context_->ReleaseCompletedResources(true); RETURN_IF_ERROR(footer_status); // Parse the file schema into an internal representation for schema resolution. @@ -265,7 +264,7 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) { } else { template_tuple_pool_->FreeAll(); dictionary_pool_.get()->FreeAll(); - context_->ReleaseCompletedResources(nullptr, true); + context_->ReleaseCompletedResources(true); for (ParquetColumnReader* col_reader : column_readers_) col_reader->Close(nullptr); // The scratch batch may still contain tuple data. We can get into this case if // Open() fails or if the query is cancelled. @@ -731,7 +730,7 @@ void HdfsParquetScanner::FlushRowGroupResources(RowBatch* row_batch) { DCHECK(row_batch != nullptr); row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false); scratch_batch_->ReleaseResources(row_batch->tuple_data_pool()); - context_->ReleaseCompletedResources(nullptr, true); + context_->ReleaseCompletedResources(true); for (ParquetColumnReader* col_reader : column_readers_) { col_reader->Close(row_batch); } @@ -1676,8 +1675,6 @@ Status HdfsParquetScanner::InitColumns( DCHECK(stream != NULL); RETURN_IF_ERROR(scalar_reader->Reset(&col_chunk.meta_data, stream)); - // Parquet column readers never return tuple data with pointers into I/O buffers. - stream->set_contains_tuple_data(false); } DCHECK_EQ(col_ranges.size(), num_scalar_readers); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-rcfile-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-rcfile-scanner.cc b/be/src/exec/hdfs-rcfile-scanner.cc index 2ea8229..a706c3d 100644 --- a/be/src/exec/hdfs-rcfile-scanner.cc +++ b/be/src/exec/hdfs-rcfile-scanner.cc @@ -77,10 +77,6 @@ Status HdfsRCFileScanner::InitNewRange() { // ptrs into the decompressed data). reuse_row_group_buffer_ = scan_node_->tuple_desc()->string_slots().empty(); - // The scanner currently copies all the column data out of the io buffer so the - // stream never contains any tuple data. - stream_->set_contains_tuple_data(false); - if (header_->is_compressed) { RETURN_IF_ERROR(Codec::CreateDecompressor(nullptr, reuse_row_group_buffer_, header_->codec, &decompressor_)); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index f6e010a..6a0abde 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -421,12 +421,6 @@ class HdfsScanNodeBase : public ScanNode { AtomicInt32 num_scanners_codegen_enabled_; AtomicInt32 num_scanners_codegen_disabled_; - /// This is the number of io buffers that are owned by the scan node and the scanners. - /// This is used just to help debug leaked io buffers to determine if the leak is - /// happening in the scanners vs other parts of the execution. - /// TODO: Remove this counter when deprecating the multi-threaded scan node. - AtomicInt32 num_owned_io_buffers_; - /// If true, counters are actively running and need to be reported in the runtime /// profile. bool counters_running_ = false; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index 8d4efec..6803b69 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -103,7 +103,6 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e if (!status.ok()) { scanner_->Close(row_batch); scanner_.reset(); - num_owned_io_buffers_.Add(-row_batch->num_io_buffers()); return status; } InitNullCollectionValues(row_batch); @@ -119,7 +118,6 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e *eos = true; } COUNTER_SET(rows_returned_counter_, num_rows_returned_); - num_owned_io_buffers_.Add(-row_batch->num_io_buffers()); if (*eos) StopAndFinalizeCounters(); return Status::OK(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 8cc85ff..8be191f 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -115,7 +115,6 @@ Status HdfsScanNode::GetNextInternal( *eos = false; unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch(); if (materialized_batch != NULL) { - num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers()); row_batch->AcquireState(materialized_batch.get()); // Update the number of materialized rows now instead of when they are materialized. // This means that scanners might process and queue up more rows than are necessary @@ -133,7 +132,6 @@ Status HdfsScanNode::GetNextInternal( *eos = true; SetDone(); } - DCHECK_EQ(materialized_batch->num_io_buffers(), 0); materialized_batch.reset(); return Status::OK(); } @@ -229,16 +227,11 @@ Status HdfsScanNode::Open(RuntimeState* state) { void HdfsScanNode::Close(RuntimeState* state) { if (is_closed()) return; SetDone(); - if (thread_avail_cb_id_ != -1) { state->resource_pool()->RemoveThreadAvailableCb(thread_avail_cb_id_); } - scanner_threads_.JoinAll(); - - num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup()); - DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers"; - + materialized_row_batches_->Cleanup(); HdfsScanNodeBase::Close(state); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index 867ce1e..d7c2d81 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -19,6 +19,7 @@ #include "runtime/row-batch.h" #include "util/string-parser.h" #include "runtime/string-value.inline.h" +#include "runtime/tuple.h" #include "common/names.h" @@ -33,9 +34,9 @@ using namespace impala; // This function takes more arguments than are strictly necessary (they could be // computed inside this function) but this is done to minimize the clang dependencies, // specifically, calling function on the scan node. -int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_size, +int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, FieldLocation* fields, int num_tuples, int max_added_tuples, - int slots_per_tuple, int row_idx_start) { + int slots_per_tuple, int row_idx_start, bool copy_strings) { DCHECK(tuple_ != NULL); uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row); uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_); @@ -53,9 +54,16 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_ // function. if (WriteCompleteTuple(pool, fields, tuple, tuple_row, template_tuple_, error, &error_in_row)) { + if (copy_strings) { + if (UNLIKELY(!tuple->CopyStrings("HdfsScanner::WriteAlignedTuples()", + state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool, + &parse_status_))) { + return -1; + } + } ++tuples_returned; - tuple_mem += tuple_byte_size_; - tuple_row_mem += row_size; + tuple_mem += tuple_byte_size(); + tuple_row_mem += sizeof(Tuple*); tuple = reinterpret_cast<Tuple*>(tuple_mem); tuple_row = reinterpret_cast<TupleRow*>(tuple_row_mem); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 4107e42..959aa91 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -48,6 +48,12 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) template_tuple_pool_(new MemPool(scan_node->mem_tracker())), tuple_byte_size_(scan_node->tuple_desc()->byte_size()), data_buffer_pool_(new MemPool(scan_node->mem_tracker())) { + DCHECK_EQ(1, scan_node->row_desc()->tuple_descriptors().size()) + << "All HDFS scanners assume one tuple per row"; + for (SlotDescriptor* string_slot : scan_node_->tuple_desc()->string_slots()) { + string_slot_offsets_.push_back( + {string_slot->null_indicator_offset(), string_slot->tuple_offset()}); + } } HdfsScanner::HdfsScanner() @@ -188,7 +194,7 @@ Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) { // which can happen if the query is very selective. We need to release memory even // if no rows passed predicates. if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) { - context_->ReleaseCompletedResources(row_batch, /* done */ false); + context_->ReleaseCompletedResources(/* done */ false); } if (context_->cancelled()) return Status::CANCELLED; // Check for UDF errors. @@ -524,6 +530,18 @@ Status HdfsScanner::CodegenWriteAlignedTuples(const HdfsScanNodeBase* node, "WriteCompleteTuple"); DCHECK_EQ(replaced, 1); + Function* copy_strings_fn; + RETURN_IF_ERROR(Tuple::CodegenCopyStrings( + codegen, *node->tuple_desc(), ©_strings_fn)); + replaced = codegen->ReplaceCallSites( + write_tuples_fn, copy_strings_fn, "CopyStrings"); + DCHECK_EQ(replaced, 1); + + int tuple_byte_size = node->tuple_desc()->byte_size(); + replaced = codegen->ReplaceCallSitesWithValue(write_tuples_fn, + codegen->GetIntConstant(TYPE_INT, tuple_byte_size), "tuple_byte_size"); + DCHECK_EQ(replaced, 1); + *write_aligned_tuples_fn = codegen->FinalizeFunction(write_tuples_fn); if (*write_aligned_tuples_fn == NULL) { return Status("Failed to finalize write_aligned_tuples_fn."); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index cb69bbe..c603593 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -96,7 +96,7 @@ struct FieldLocation { // /// This class also encapsulates row batch management. Subclasses should call CommitRows() /// after writing to the current row batch, which handles creating row batches, attaching -/// resources (IO buffers and mem pools) to the current row batch, and passing row batches +/// resources (buffers and mem pools) to the current row batch, and passing row batches /// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory /// management. /// TODO: Have a pass over all members and move them out of the base class if sensible @@ -137,7 +137,7 @@ class HdfsScanner { /// queue. Only valid to call if HasRowBatchQueue(). void Close(); - /// Transfers the ownership of memory backing returned tuples such as IO buffers + /// Transfers the ownership of memory backing returned tuples such as buffers /// and memory in mem pools to the given row batch. If the row batch is NULL, /// those resources are released instead. In any case, releases all other resources /// that are not backing returned rows (e.g. temporary decompression buffers). @@ -271,13 +271,19 @@ class HdfsScanner { /// decompressor and any other per data block allocations. boost::scoped_ptr<MemPool> data_buffer_pool_; + /// Offsets of string slots in the result tuple that may need to be copied as part of + /// tuple materialization. Populated in constructor. This is redundant with offset + /// information stored in the TupleDescriptor but storing only the required metadata + /// in a simple array of struct simplifies codegen and speeds up interpretation. + std::vector<SlotOffsets> string_slot_offsets_; + /// Time spent decompressing bytes. RuntimeProfile::Counter* decompress_timer_ = nullptr; /// Matching typedef for WriteAlignedTuples for codegen. Refer to comments for /// that function. - typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, int, FieldLocation*, - int, int, int, int); + typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, FieldLocation*, + int, int, int, int, bool); /// Jitted write tuples function pointer. Null if codegen is disabled. WriteTuplesFn write_tuples_fn_ = nullptr; @@ -332,15 +338,18 @@ class HdfsScanner { /// - 'fields' must start at the beginning of a tuple. /// - 'num_tuples' number of tuples to process /// - 'max_added_tuples' the maximum number of tuples that should be added to the batch. - /// - 'row_start_index' is the number of rows that have already been processed + /// - 'row_idx_start' is the number of rows that have already been processed /// as part of WritePartialTuple. + /// - 'copy_strings': if true, strings in returned tuples that pass conjuncts are + /// copied into 'pool' /// Returns the number of tuples added to the row batch. This can be less than /// num_tuples/tuples_till_limit because of failed conjuncts. - /// Returns -1 if parsing should be aborted due to parse errors. + /// Returns -1 if an error is encountered, e.g. a parse error or a memory allocation + /// error. /// Only valid to call if the parent scan node is multi-threaded. - int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size, - FieldLocation* fields, int num_tuples, - int max_added_tuples, int slots_per_tuple, int row_start_indx); + int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, FieldLocation* fields, + int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start, + bool copy_strings); /// Update the decompressor_ object given a compression type or codec name. Depending on /// the old compression type and the new one, it may close the old decompressor and/or http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 9b66432..1c248bb 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -266,6 +266,9 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) { // Materialize parsed cols to tuples SCOPED_TIMER(scan_node_->materialize_tuple_timer()); + + // Need to copy out strings if they may reference the original I/O buffer. + const bool copy_strings = !header_->is_compressed && !string_slot_offsets_.empty(); // Call jitted function if possible int tuples_returned; if (write_tuples_fn_ != nullptr) { @@ -275,12 +278,12 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) { delimited_text_parser_->escape_char() == '\0'); // last argument: seq always starts at record_location[0] tuples_returned = write_tuples_fn_(this, row_batch->tuple_data_pool(), tuple_row, - row_batch->row_byte_size(), field_locations_.data(), num_to_process, - max_added_tuples, scan_node_->materialized_slots().size(), 0); + field_locations_.data(), num_to_process, + max_added_tuples, scan_node_->materialized_slots().size(), 0, copy_strings); } else { tuples_returned = WriteAlignedTuples(row_batch->tuple_data_pool(), tuple_row, - row_batch->row_byte_size(), field_locations_.data(), num_to_process, - max_added_tuples, scan_node_->materialized_slots().size(), 0); + field_locations_.data(), num_to_process, + max_added_tuples, scan_node_->materialized_slots().size(), 0, copy_strings); } if (tuples_returned == -1) return parse_status_; @@ -301,6 +304,7 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); int64_t num_rows_read = 0; + const bool copy_strings = !seq_header->is_compressed && !string_slot_offsets_.empty(); const bool has_materialized_slots = !scan_node_->materialized_slots().empty(); while (!eos_) { DCHECK_GT(record_locations_.size(), 0); @@ -323,12 +327,20 @@ Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) { uint8_t error_in_row = false; uint8_t errors[num_fields]; memset(errors, 0, num_fields); - add_row = WriteCompleteTuple(row_batch->tuple_data_pool(), field_locations_.data(), + MemPool* pool = row_batch->tuple_data_pool(); + add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row); if (UNLIKELY(error_in_row)) { ReportTupleParseError(field_locations_.data(), errors); RETURN_IF_ERROR(parse_status_); } + if (add_row && copy_strings) { + if (UNLIKELY(!tuple_->CopyStrings("HdfsSequenceScanner::ProcessRange()", + state_, string_slot_offsets_.data(), string_slot_offsets_.size(), pool, + &parse_status_))) { + return parse_status_; + } + } } else { add_row = WriteTemplateTuples(tuple_row_mem, 1); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index f1c1ff5..1cbc6f5 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -167,7 +167,6 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { if (row_batch != nullptr) { row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); - context_->ReleaseCompletedResources(row_batch, true); if (scan_node_->HasRowBatchQueue()) { static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch( unique_ptr<RowBatch>(row_batch)); @@ -175,8 +174,8 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { } else { template_tuple_pool_->FreeAll(); data_buffer_pool_->FreeAll(); - context_->ReleaseCompletedResources(nullptr, true); } + context_->ReleaseCompletedResources(true); // Verify all resources (if any) have been transferred or freed. DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0); @@ -192,12 +191,6 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { Status HdfsTextScanner::InitNewRange() { DCHECK_EQ(scan_state_, CONSTRUCTED); - // Compressed text does not reference data in the io buffers directly. In such case, we - // can recycle the buffers in the stream_ more promptly. - if (stream_->file_desc()->file_compression != THdfsCompression::NONE) { - stream_->set_contains_tuple_data(false); - } - // Update the decompressor based on the compression type of the file in the context. DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY) << "FE should have generated SNAPPY_BLOCKED instead."; @@ -591,7 +584,7 @@ Status HdfsTextScanner::FillByteBufferCompressedStream(MemPool* pool, bool* eosr if (*eosr) { DCHECK(stream_->eosr()); - context_->ReleaseCompletedResources(nullptr, true); + context_->ReleaseCompletedResources(true); } return Status::OK(); @@ -637,7 +630,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { &decompressed_buffer)); // Inform 'stream_' that the buffer with the compressed text can be released. - context_->ReleaseCompletedResources(nullptr, true); + context_->ReleaseCompletedResources(true); VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len; byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer); @@ -837,6 +830,9 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool, // Write complete tuples. The current field, if any, is at the start of a tuple. if (num_tuples > 0) { + // Need to copy out strings if they may reference the original I/O buffer. + const bool copy_strings = !string_slot_offsets_.empty() && + stream_->file_desc()->file_compression == THdfsCompression::NONE; int max_added_tuples = (scan_node_->limit() == -1) ? num_tuples : scan_node_->limit() - scan_node_->rows_returned(); int tuples_returned = 0; @@ -846,13 +842,13 @@ int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool, // slots and escape characters. TextConverter::WriteSlot() will be used instead. DCHECK(scan_node_->tuple_desc()->string_slots().empty() || delimited_text_parser_->escape_char() == '\0'); - tuples_returned = write_tuples_fn_(this, pool, row, sizeof(Tuple*), fields, - num_tuples, max_added_tuples, scan_node_->materialized_slots().size(), - num_tuples_processed); + tuples_returned = write_tuples_fn_(this, pool, row, fields, num_tuples, + max_added_tuples, scan_node_->materialized_slots().size(), + num_tuples_processed, copy_strings); } else { - tuples_returned = WriteAlignedTuples(pool, row, sizeof(Tuple*), fields, - num_tuples, max_added_tuples, scan_node_->materialized_slots().size(), - num_tuples_processed); + tuples_returned = WriteAlignedTuples(pool, row, fields, num_tuples, + max_added_tuples, scan_node_->materialized_slots().size(), + num_tuples_processed, copy_strings); } if (tuples_returned == -1) return 0; DCHECK_EQ(slot_idx_, 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 15c963d..04127e3 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -996,7 +996,7 @@ Status BaseScalarColumnReader::ReadDataPage() { } // We don't hold any pointers to earlier pages in the stream - we can safely free // any accumulated I/O or boundary buffers. - stream_->ReleaseCompletedResources(nullptr, false); + stream_->ReleaseCompletedResources(false); // Read the next data page, skipping page types we don't care about. // We break out of this loop on the non-error case (a data page was found or we read all http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 3ed8b4a..8cb195d 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -58,9 +58,9 @@ ScannerContext::~ScannerContext() { DCHECK(streams_.empty()); } -void ScannerContext::ReleaseCompletedResources(RowBatch* batch, bool done) { +void ScannerContext::ReleaseCompletedResources(bool done) { for (int i = 0; i < streams_.size(); ++i) { - streams_[i]->ReleaseCompletedResources(batch, done); + streams_[i]->ReleaseCompletedResources(done); } } @@ -87,13 +87,11 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { stream->output_buffer_pos_ = NULL; stream->output_buffer_bytes_left_ = const_cast<int64_t*>(&OUTPUT_BUFFER_BYTES_LEFT_INIT); - stream->contains_tuple_data_ = scan_node_->tuple_desc()->ContainsStringData(); streams_.push_back(std::move(stream)); return streams_.back().get(); } -void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) { - DCHECK(batch != nullptr || done || !contains_tuple_data_); +void ScannerContext::Stream::ReleaseCompletedResources(bool done) { if (done) { // Mark any pending resources as completed if (io_buffer_ != nullptr) { @@ -108,24 +106,11 @@ void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool don } for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) { - if (contains_tuple_data_ && batch != nullptr) { - batch->AddIoBuffer(move(buffer)); - // TODO: We can do row batch compaction here. This is the only place io buffers are - // queued. A good heuristic is to check the number of io buffers queued and if - // there are too many, we should compact. - } else { - ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer)); - parent_->scan_node_->num_owned_io_buffers_.Add(-1); - } + ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer)); } parent_->num_completed_io_buffers_ -= completed_io_buffers_.size(); completed_io_buffers_.clear(); - if (contains_tuple_data_ && batch != nullptr) { - // If we're not done, keep using the last chunk allocated in boundary_pool_ so we - // don't have to reallocate. If we are done, transfer it to the row batch. - batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), /* keep_current */ !done); - } if (done) boundary_pool_->FreeAll(); } @@ -196,7 +181,6 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { filename(), offset)); } - parent_->scan_node_->num_owned_io_buffers_.Add(1); io_buffer_pos_ = reinterpret_cast<uint8_t*>(io_buffer_->buffer()); io_buffer_bytes_left_ = io_buffer_->len(); if (io_buffer_->len() == 0) { @@ -262,13 +246,7 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, DCHECK_GT(requested_len, boundary_buffer_bytes_left_); *out_buffer = NULL; - if (boundary_buffer_bytes_left_ == 0) { - if (contains_tuple_data_) { - boundary_buffer_->Reset(); - } else { - boundary_buffer_->Clear(); - } - } + if (boundary_buffer_bytes_left_ == 0) boundary_buffer_->Clear(); DCHECK(ValidateBufferPointers()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 3fe14aa..216209f 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -100,13 +100,6 @@ class ScannerContext { /// If we are past the end of the scan range, no bytes are returned. Status GetBuffer(bool peek, uint8_t** buffer, int64_t* out_len); - /// Sets whether of not the resulting tuples contain ptrs into memory owned by - /// the scanner context. This by default, is inferred from the scan_node tuple - /// descriptor (i.e. contains string slots) but can be overridden. If possible, - /// this should be set to false to reduce memory usage as resources can be reused - /// and recycled more quickly. - void set_contains_tuple_data(bool v) { contains_tuple_data_ = v; } - /// Callback that returns the buffer size to use when reading past the end of the scan /// range. Reading past the end of the scan range is likely a remote read, so we want /// find a good trade-off between io requests and data volume. Scanners that have @@ -175,13 +168,10 @@ class ScannerContext { /// Skip this text object. bool SkipText(Status*); - /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed - /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight - /// resources are also attached or released. - /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' false. Such - /// a call will release all completed resources. If 'done' is true all in-flight - /// resources are also freed. - void ReleaseCompletedResources(RowBatch* batch, bool done); + /// Release all completed resources in the context, i.e. I/O and boundary buffers + /// that the caller has finished reading. If 'done' is true all resources are + /// freed, even if the caller has not read that data yet. + void ReleaseCompletedResources(bool done); private: friend class ScannerContext; @@ -189,10 +179,6 @@ class ScannerContext { DiskIoMgr::ScanRange* scan_range_; const HdfsFileDesc* file_desc_; - /// If true, tuples will contain pointers into memory contained in this object. - /// That memory (io buffers or boundary buffers) must be attached to the row batch. - bool contains_tuple_data_; - /// Total number of bytes returned from GetBytes() int64_t total_bytes_returned_; @@ -240,7 +226,7 @@ class ScannerContext { /// List of buffers that are completed but still have bytes referenced by the caller. /// On the next GetBytes() call, these buffers are released (the caller by calling /// GetBytes() signals it is done with its previous bytes). At this point the - /// buffers are either returned to the io mgr or attached to the current row batch. + /// buffers are returned to the I/O manager. std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> completed_io_buffers_; Stream(ScannerContext* parent); @@ -280,22 +266,23 @@ class ScannerContext { return streams_[idx].get(); } - /// If a non-NULL 'batch' is passed, attaches completed io buffers and boundary mem pools - /// from all streams to 'batch'. Attaching only completed resources ensures that buffers - /// (and their cleanup) trail the rows that reference them (row batches are consumed and - /// cleaned up in order by the rest of the query). - /// If 'done' is true, this is the final call for the current streams and any pending - /// resources in each stream are also passed to the row batch. Callers which want to - /// clear the streams from the context should also call ClearStreams(). - /// - /// A NULL 'batch' may be passed to free all resources. It is only valid to pass a NULL - /// 'batch' when also passing 'done'. + /// Returns completed I/O buffers to the I/O manager. If 'done' is true, this is the + /// final call for the current streams and any pending resources in each stream are + /// also freed. Callers which want to clear the streams from the context should also + /// call ClearStreams(). /// /// This must be called with 'done' set when the scanner is complete and no longer needs /// any resources (e.g. tuple memory, io buffers) returned from the current streams. /// After calling with 'done' set, this should be called again if new streams are /// created via AddStream(). - void ReleaseCompletedResources(RowBatch* batch, bool done); + void ReleaseCompletedResources(bool done); + + /// Overload with the signature expected by Impala-lzo to enable easier staging of + /// the API change. TODO: remove this once Impala-lzo is updated to use the new + /// signature. + void ReleaseCompletedResources(RowBatch* batch, bool done) { + ReleaseCompletedResources(done); + } /// Releases all the Stream objects in the vector 'streams_' and reduces the vector's /// size to 0. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/exec/text-converter.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/text-converter.cc b/be/src/exec/text-converter.cc index 0cad05c..3bb65f2 100644 --- a/be/src/exec/text-converter.cc +++ b/be/src/exec/text-converter.cc @@ -103,8 +103,8 @@ void TextConverter::UnescapeString(const char* src, char* dest, int* len, // store i8 %null_bit_set, i8* %null_byte_ptr // ret i1 false //} - - +// TODO: convert this function to use cross-compilation + constant substitution in whole +// or part. It is currently too complex and doesn't implement the full functionality. Status TextConverter::CodegenWriteSlot(LlvmCodeGen* codegen, TupleDescriptor* tuple_desc, SlotDescriptor* slot_desc, Function** fn, const char* null_col_val, int len, bool check_null, bool strict_mode) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/runtime/row-batch.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index beed671..a6e935a 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -45,7 +45,7 @@ RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_ flush_(FlushMode::NO_FLUSH_RESOURCES), needs_deep_copy_(false), num_tuples_per_row_(row_desc->tuple_descriptors().size()), - auxiliary_mem_usage_(0), + attached_buffer_bytes_(0), tuple_data_pool_(mem_tracker), row_desc_(row_desc), mem_tracker_(mem_tracker) { @@ -72,7 +72,7 @@ RowBatch::RowBatch( flush_(FlushMode::NO_FLUSH_RESOURCES), needs_deep_copy_(false), num_tuples_per_row_(input_batch.row_tuples.size()), - auxiliary_mem_usage_(0), + attached_buffer_bytes_(0), tuple_data_pool_(mem_tracker), row_desc_(row_desc), mem_tracker_(mem_tracker) { @@ -144,9 +144,6 @@ RowBatch::RowBatch( RowBatch::~RowBatch() { tuple_data_pool_.FreeAll(); - for (int i = 0; i < io_buffers_.size(); ++i) { - ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); - } for (BufferInfo& buffer_info : buffers_) { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); @@ -285,16 +282,9 @@ void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples, DCHECK_EQ(offset, size); } -void RowBatch::AddIoBuffer(unique_ptr<DiskIoMgr::BufferDescriptor> buffer) { - DCHECK(buffer != NULL); - auxiliary_mem_usage_ += buffer->buffer_len(); - buffer->TransferOwnership(mem_tracker_); - io_buffers_.emplace_back(move(buffer)); -} - void RowBatch::AddBuffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, FlushMode flush) { - auxiliary_mem_usage_ += buffer.len(); + attached_buffer_bytes_ += buffer.len(); BufferInfo buffer_info; buffer_info.client = client; buffer_info.buffer = std::move(buffer); @@ -307,26 +297,18 @@ void RowBatch::Reset() { capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*)); // TODO: Change this to Clear() and investigate the repercussions. tuple_data_pool_.FreeAll(); - for (int i = 0; i < io_buffers_.size(); ++i) { - ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffers_[i])); - } - io_buffers_.clear(); for (BufferInfo& buffer_info : buffers_) { ExecEnv::GetInstance()->buffer_pool()->FreeBuffer( buffer_info.client, &buffer_info.buffer); } buffers_.clear(); - auxiliary_mem_usage_ = 0; + attached_buffer_bytes_ = 0; flush_ = FlushMode::NO_FLUSH_RESOURCES; needs_deep_copy_ = false; } void RowBatch::TransferResourceOwnership(RowBatch* dest) { dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false); - for (int i = 0; i < io_buffers_.size(); ++i) { - dest->AddIoBuffer(move(io_buffers_[i])); - } - io_buffers_.clear(); for (BufferInfo& buffer_info : buffers_) { dest->AddBuffer( buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES); @@ -362,7 +344,7 @@ void RowBatch::AcquireState(RowBatch* src) { // The destination row batch should be empty. DCHECK(!needs_deep_copy_); DCHECK_EQ(num_rows_, 0); - DCHECK_EQ(auxiliary_mem_usage_, 0); + DCHECK_EQ(attached_buffer_bytes_, 0); num_rows_ = src->num_rows_; capacity_ = src->capacity_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/19c17e64/be/src/runtime/row-batch.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 49dd066..5a7edd4 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -43,16 +43,18 @@ class TupleDescriptor; /// A RowBatch encapsulates a batch of rows, each composed of a number of tuples. /// The maximum number of rows is fixed at the time of construction. -/// The row batch reference a few different sources of memory. -/// 1. TupleRow ptrs - may be malloc'd and owned by the RowBatch or allocated from -/// the tuple pool, depending on whether legacy joins and aggs are enabled. -/// See the comment on tuple_ptrs_ for more details. -/// 2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool. -/// 3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally -/// (don't copy strings) or from the tuple pool (strings are copied). If external, -/// the data is in an io buffer that may not be attached to this row batch. The -/// creator of that row batch has to make sure that the io buffer is not recycled -/// until all batches that reference the memory have been consumed. +/// The row batch can reference various types of memory. +/// 1. TupleRow ptrs - malloc'd and owned by the RowBatch. See the comment on +/// tuple_ptrs_ for more details. +/// 2. Fixed and variable-length tuple data. This memory may be directly attached to +/// the batch: either in the batch's MemPool or in an attached buffer. Or it may +/// live elsewhere - either in a subsequent batch returned by an ExecNode or +/// still be owned by the ExecNode that produced the batch. In those cases the +/// owner of this RowBatch must be careful not to close the producing ExecNode +/// or free resources from trailing batches while the batch's data is still being +/// used. +/// TODO: IMPALA-4179: simplify the ownership transfer model. +/// /// In order to minimize memory allocations, RowBatches and TRowBatches that have been /// serialized and sent over the wire should be reused (this prevents compression_scratch_ /// from being needlessly reallocated). @@ -62,13 +64,13 @@ class TupleDescriptor; /// and reference memory outside of the row batch. This results in most row batches /// having a very small memory footprint and in some row batches having a very large /// one (it contains all the memory that other row batches are referencing). An example -/// is IoBuffers which are only attached to one row batch. Only when the row batch reaches +/// is buffers which are only attached to one row batch. Only when the row batch reaches /// a blocking operator or the root of the fragment is the row batch memory freed. /// This means that in some cases (e.g. very selective queries), we still need to /// pass the row batch through the exec nodes (even if they have no rows) to trigger -/// memory deletion. AtCapacity() encapsulates the check that we are not accumulating -/// excessive memory. -// +/// memory deletion. AtCapacity() encapsulates the check that the batch does not have +/// excessive memory attached to it. +/// /// A row batch is considered at capacity if all the rows are full or it has accumulated /// auxiliary memory up to a soft cap. (See at_capacity_mem_usage_ comment). class RowBatch { @@ -138,7 +140,7 @@ class RowBatch { // MarkFlushResources(). DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES) || num_rows_ == capacity_); - int64_t mem_usage = auxiliary_mem_usage_ + tuple_data_pool_.total_allocated_bytes(); + int64_t mem_usage = attached_buffer_bytes_ + tuple_data_pool_.total_allocated_bytes(); return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE; } @@ -203,24 +205,18 @@ class RowBatch { }; int num_tuples_per_row() { return num_tuples_per_row_; } - int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); } MemPool* tuple_data_pool() { return &tuple_data_pool_; } - int num_io_buffers() const { return io_buffers_.size(); } int num_buffers() const { return buffers_.size(); } /// Resets the row batch, returning all resources it has accumulated. void Reset(); - /// Add io buffer to this row batch. - void AddIoBuffer(std::unique_ptr<DiskIoMgr::BufferDescriptor> buffer); - /// Adds a buffer to this row batch. The buffer is deleted when freeing resources. /// The buffer's memory remains accounted against the original owner, even when the /// ownership of batches is transferred. If the original owner wants the memory to be /// released, it should call this with 'mode' FLUSH_RESOURCES (see MarkFlushResources() /// for further explanation). - /// TODO: IMPALA-4179: after IMPALA-3200, simplify the ownership transfer model and - /// make it consistent between buffers and I/O buffers. + /// TODO: IMPALA-4179: simplify the ownership transfer model. void AddBuffer(BufferPool::ClientHandle* client, BufferPool::BufferHandle&& buffer, FlushMode flush); @@ -230,10 +226,10 @@ class RowBatch { /// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This /// ensures that batches are flushed by streaming operators all the way up the operator /// tree. Blocking operators can still accumulate batches with this flag. - /// TODO: IMPALA-3200: blocking operators should acquire all memory resources including - /// attached blocks/buffers, so that MarkFlushResources() can guarantee that the + /// TODO: IMPALA-4179: blocking operators should acquire all memory resources including + /// attached buffers, so that MarkFlushResources() can guarantee that the /// resources will not be accounted against the original operator (this is currently - /// not true for Blocks, which can't be transferred). + /// not true for buffers, which aren't transferred). void MarkFlushResources() { DCHECK_LE(num_rows_, capacity_); capacity_ = num_rows_; @@ -256,7 +252,7 @@ class RowBatch { bool needs_deep_copy() { return needs_deep_copy_; } /// Transfer ownership of resources to dest. This includes tuple data in mem - /// pool and io buffers. + /// pool and buffers. void TransferResourceOwnership(RowBatch* dest); void CopyRow(TupleRow* src, TupleRow* dest) { @@ -277,7 +273,7 @@ class RowBatch { memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*)); } - /// Acquires state from the 'src' row batch into this row batch. This includes all IO + /// Acquires state from the 'src' row batch into this row batch. This includes all /// buffers and tuple data. /// This row batch must be empty and have the same row descriptor as the src batch. /// This is used for scan nodes which produce RowBatches asynchronously. Typically, @@ -399,9 +395,8 @@ class RowBatch { int tuple_ptrs_size_; Tuple** tuple_ptrs_; - /// Sum of all auxiliary bytes. This includes IoBuffers and memory from - /// TransferResourceOwnership(). - int64_t auxiliary_mem_usage_; + /// Total bytes of BufferPool buffers attached to this batch. + int64_t attached_buffer_bytes_; /// holding (some of the) data referenced by rows MemPool tuple_data_pool_; @@ -415,11 +410,6 @@ class RowBatch { MemTracker* mem_tracker_; // not owned - /// IO buffers current owned by this row batch. Ownership of IO buffers transfer - /// between row batches. Any IO buffer will be owned by at most one row batch - /// (i.e. they are not ref counted) so most row batches don't own any. - std::vector<std::unique_ptr<DiskIoMgr::BufferDescriptor>> io_buffers_; - struct BufferInfo { BufferPool::ClientHandle* client; BufferPool::BufferHandle buffer;
