IMPALA-3905: Implements HdfsScanner::GetNext() for text scans. Implements HdfsLzoTextTextScanner::GetNext() and changes ProcessSplit() to repeatedly call GetNext() to share the core scanning code between the legacy ProcessSplit() interface (ProcessSpit()) and the new GetNext() interface.
These changes were tricky: - The scanner used to rely on the ability to attach a batch to the row-batch queue for freeing resources - This patch attempts to preserve the resource-freeing behavior by clearing resources as soon as they are complete - In particular, the scanner attempts to skip corrupt/invalid data blocks, and we should avoid accumulating memory unnecessarily The other changes are mostly straightforward: - Add a RowBatch parameter to various functions - Add a MemPool parameter to various functions for attaching memory of completed resources that may still be references by returned batches - Change Close() to free all resources when a nullptr RowBatch is passed Testing: - Exhaustive tests passed on debug - Core tests passed on asan - TODO: Perf testing on cluster Change-Id: Id193aa223434d7cc40061a42f81bbb29dcd0404b Reviewed-on: http://gerrit.cloudera.org:8080/6000 Reviewed-by: Alex Behm <[email protected]> Tested-by: Alex Behm <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/be8d1512 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/be8d1512 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/be8d1512 Branch: refs/heads/master Commit: be8d15122de9b1ee35bdd2de43b8daaaa7547098 Parents: 85d7f5e Author: Alex Behm <[email protected]> Authored: Tue Sep 13 10:05:00 2016 -0700 Committer: Alex Behm <[email protected]> Committed: Tue Apr 4 01:21:07 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 12 +- be/src/exec/hdfs-parquet-scanner.h | 5 - be/src/exec/hdfs-scan-node-base.cc | 13 + be/src/exec/hdfs-scan-node-base.h | 13 + be/src/exec/hdfs-scan-node-mt.cc | 3 +- be/src/exec/hdfs-scan-node.cc | 13 - be/src/exec/hdfs-scan-node.h | 15 - be/src/exec/hdfs-scanner-ir.cc | 2 - be/src/exec/hdfs-scanner.cc | 26 +- be/src/exec/hdfs-scanner.h | 24 +- be/src/exec/hdfs-text-scanner.cc | 298 +++++++++++-------- be/src/exec/hdfs-text-scanner.h | 80 +++-- be/src/exec/scanner-context.cc | 2 +- be/src/exec/scanner-context.h | 5 +- .../org/apache/impala/planner/HdfsScanNode.java | 4 +- 15 files changed, 296 insertions(+), 219 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index f21ccff..aba2bef 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -404,7 +404,8 @@ Status HdfsParquetScanner::ProcessSplit() { DCHECK(scan_node_->HasRowBatchQueue()); HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_); do { - StartNewParquetRowBatch(); + batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); RETURN_IF_ERROR(GetNextInternal(batch_)); scan_node->AddMaterializedRowBatch(batch_); ++row_batches_produced_; @@ -414,7 +415,8 @@ Status HdfsParquetScanner::ProcessSplit() { } while (!eos_ && !scan_node_->ReachedLimit()); // Transfer the remaining resources to this new batch in Close(). - StartNewParquetRowBatch(); + batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); return Status::OK(); } @@ -976,12 +978,6 @@ Status HdfsParquetScanner::AssembleRows( return Status::OK(); } -void HdfsParquetScanner::StartNewParquetRowBatch() { - DCHECK(scan_node_->HasRowBatchQueue()); - batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), - scan_node_->mem_tracker()); -} - Status HdfsParquetScanner::CommitRows(RowBatch* dst_batch, int num_rows) { DCHECK(dst_batch != NULL); dst_batch->CommitRows(num_rows); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index 34bfcc9..18c14fe 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -509,11 +509,6 @@ class HdfsParquetScanner : public HdfsScanner { Status AssembleRows(const std::vector<ParquetColumnReader*>& column_readers, RowBatch* row_batch, bool* skip_row_group); - /// Set 'batch_' to a new row batch. Unlike the similarly named function in - /// HdfsScanner, this function will not allocate the tuple buffer. Only valid - /// to call if 'add_batches_to_queue_' is true. - void StartNewParquetRowBatch(); - /// Commit num_rows to the given row batch. /// Returns OK if the query is not cancelled and hasn't exceeded any mem limits. /// Scanner can call this with 0 rows to flush any pending resources (attached pools http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index e478b69..238155f 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -625,6 +625,19 @@ HdfsFileDesc* HdfsScanNodeBase::GetFileDesc(const string& filename) { return file_descs_[filename]; } +void HdfsScanNodeBase::SetFileMetadata(const string& filename, void* metadata) { + unique_lock<mutex> l(metadata_lock_); + DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end()); + per_file_metadata_[filename] = metadata; +} + +void* HdfsScanNodeBase::GetFileMetadata(const string& filename) { + unique_lock<mutex> l(metadata_lock_); + map<string, void*>::iterator it = per_file_metadata_.find(filename); + if (it == per_file_metadata_.end()) return NULL; + return it->second; +} + void* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) { CodegendFnMap::iterator it = codegend_fn_map_.find(type); if (it == codegend_fn_map_.end()) return NULL; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index cde9574..7777d4d 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -233,6 +233,14 @@ class HdfsScanNodeBase : public ScanNode { /// Returns the file desc for 'filename'. Returns NULL if filename is invalid. HdfsFileDesc* GetFileDesc(const std::string& filename); + /// Sets the scanner specific metadata for 'filename'. Scanners can use this to store + /// file header information. Thread safe. + void SetFileMetadata(const std::string& filename, void* metadata); + + /// Returns the scanner specific metadata for 'filename'. Returns NULL if there is no + /// metadata. Thread safe. + void* GetFileMetadata(const std::string& filename); + /// Called by scanners when a range is complete. Used to record progress. /// This *must* only be called after a scanner has completely finished its /// scan range (i.e. context->Flush()), and has returned the final row batch. @@ -336,6 +344,11 @@ class HdfsScanNodeBase : public ScanNode { typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap; FileFormatsMap per_type_files_; + /// Scanner specific per file metadata (e.g. header information) and associated lock. + /// TODO: Remove this lock when removing the legacy scanners and scan nodes. + boost::mutex metadata_lock_; + std::map<std::string, void*> per_file_metadata_; + /// Conjuncts for each materialized tuple (top-level row batch tuples and collection /// item tuples). Includes a copy of ExecNode.conjuncts_. ConjunctsMap conjuncts_map_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index 50936b8..9fad46e 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -45,7 +45,8 @@ Status HdfsScanNodeMt::Prepare(RuntimeState* state) { // Return an error if this scan node has been assigned a range that is not supported // because the scanner of the corresponding file format does implement GetNext(). for (const auto& files: per_type_files_) { - if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET) { + if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET + && files.first != THdfsFileFormat::TEXT) { stringstream msg; msg << "Unsupported file format with HdfsScanNodeMt: " << files.first; return Status(msg.str()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 03217e0..c3972b2 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -239,19 +239,6 @@ void HdfsScanNode::Close(RuntimeState* state) { HdfsScanNodeBase::Close(state); } -void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) { - unique_lock<mutex> l(metadata_lock_); - DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end()); - per_file_metadata_[filename] = metadata; -} - -void* HdfsScanNode::GetFileMetadata(const string& filename) { - unique_lock<mutex> l(metadata_lock_); - map<string, void*>::iterator it = per_file_metadata_.find(filename); - if (it == per_file_metadata_.end()) return NULL; - return it->second; -} - void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, const std::vector<THdfsCompression::type>& compression_type) { lock_guard<SpinLock> l(file_type_counts_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index f8064b1..41647da 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -85,16 +85,6 @@ class HdfsScanNode : public HdfsScanNodeBase { /// This function will block if materialized_row_batches_ is full. void AddMaterializedRowBatch(RowBatch* row_batch); - /// Sets the scanner specific metadata for 'filename'. - /// This is thread safe. - void SetFileMetadata(const std::string& filename, void* metadata); - - /// Gets scanner specific metadata for 'filename'. Scanners can use this to store - /// file header information. - /// Returns NULL if there is no metadata. - /// This is thread safe. - void* GetFileMetadata(const std::string& filename); - /// Called by scanners when a range is complete. Used to record progress and set done_. /// This *must* only be called after a scanner has completely finished its /// scan range (i.e. context->Flush()), and has added the final row batch to the row @@ -115,11 +105,6 @@ class HdfsScanNode : public HdfsScanNodeBase { /// scanner threads. int64_t scanner_thread_bytes_required_; - /// Scanner specific per file metadata (e.g. header information) and associated lock. - /// This lock cannot be taken together with any other locks except lock_. - boost::mutex metadata_lock_; - std::map<std::string, void*> per_file_metadata_; - /// Thread group for all scanner worker threads ThreadGroup scanner_threads_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index e6da220..5b474d1 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -36,8 +36,6 @@ using namespace impala; int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_size, FieldLocation* fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start) { - - DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(tuple_ != NULL); uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row); uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 64a922d..d613b15 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -192,26 +192,30 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool return Status::OK(); } -// TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check? -Status HdfsScanner::CommitRows(int num_rows) { - DCHECK(scan_node_->HasRowBatchQueue()); - DCHECK(batch_ != NULL); - DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows()); - batch_->CommitRows(num_rows); +Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch) { + DCHECK(batch_ != NULL || !scan_node_->HasRowBatchQueue()); + DCHECK(batch_ == row_batch || !scan_node_->HasRowBatchQueue()); + DCHECK(!enqueue_if_full || scan_node_->HasRowBatchQueue()); + DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows()); + row_batch->CommitRows(num_rows); tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows; + tuple_ = reinterpret_cast<Tuple*>(tuple_mem_); // We need to pass the row batch to the scan node if there is too much memory attached, // which can happen if the query is very selective. We need to release memory even // if no rows passed predicates. - if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) { - context_->ReleaseCompletedResources(batch_, /* done */ false); - static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_); - RETURN_IF_ERROR(StartNewRowBatch()); + if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) { + context_->ReleaseCompletedResources(row_batch, /* done */ false); + if (enqueue_if_full) { + static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); + RETURN_IF_ERROR(StartNewRowBatch()); + } } if (context_->cancelled()) return Status::CANCELLED; // Check for UDF errors. RETURN_IF_ERROR(state_->GetQueryStatus()); - // Free local expr allocations for this thread + // Free local expr allocations for this thread to avoid accumulating too much + // memory from evaluating the scanner conjuncts. for (const auto& entry: scanner_conjuncts_map_) { ExprContext::FreeLocalAllocations(entry.second); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index df21fb2..f61c5fc 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -322,13 +322,23 @@ class HdfsScanner { Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows); - /// Commit num_rows to the current row batch. If this completes, the row batch is - /// enqueued with the scan node and StartNewRowBatch() is called. - /// Returns Status::OK if the query is not cancelled and hasn't exceeded any mem limits. - /// Scanner can call this with 0 rows to flush any pending resources (attached pools - /// and io buffers) to minimize memory consumption. - /// Only valid to call if the parent scan node is multi-threaded. - Status CommitRows(int num_rows); + /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly. + /// Attaches completed resources from 'context_' to 'row_batch' if necessary. + /// Frees local expr allocations. + /// If 'enqueue_if_full' is true and 'row_batch' is at capacity after committing the + /// rows, then 'row_batch' is added to the queue, and a new batch is created with + /// StartNewRowBatch(). It is only valid to pass true for 'enqueue_if_full' if the + /// parent parent scan node is multi-threaded. + /// Returns non-OK if 'context_' is cancelled or the query status in 'state_' is + /// non-OK. + Status CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch); + + /// Calls the above CommitRows() passing true for 'queue_if_full', and 'batch_' as the + /// row batch. Only valid to call if the parent scan node is multi-threaded. + Status CommitRows(int num_rows) { + DCHECK(scan_node_->HasRowBatchQueue()); + return CommitRows(num_rows, true, batch_); + } /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the row batch /// will be committed. 'commit_batch' should be true if the attached pool is expected http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index 07281df..ced7ab1 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -52,15 +52,20 @@ const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024; HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : HdfsScanner(scan_node, state), - byte_buffer_ptr_(NULL), - byte_buffer_end_(NULL), + byte_buffer_ptr_(nullptr), + byte_buffer_end_(nullptr), byte_buffer_read_size_(0), only_parsing_header_(false), + scan_state_(CONSTRUCTED), boundary_pool_(new MemPool(scan_node->mem_tracker())), boundary_row_(boundary_pool_.get()), boundary_column_(boundary_pool_.get()), slot_idx_(0), - error_in_row_(false) { + batch_start_ptr_(nullptr), + error_in_row_(false), + partial_tuple_(nullptr), + partial_tuple_empty_(true), + parse_delimiter_timer_(nullptr) { } HdfsTextScanner::~HdfsTextScanner() { @@ -149,43 +154,29 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, Status HdfsTextScanner::ProcessSplit() { DCHECK(scan_node_->HasRowBatchQueue()); - - // Reset state for new scan range - RETURN_IF_ERROR(InitNewRange()); - - // Find the first tuple. If tuple_found is false, it means we went through the entire - // scan range without finding a single tuple. The bytes will be picked up by the scan - // range before. - bool tuple_found; - RETURN_IF_ERROR(FindFirstTuple(&tuple_found)); - - if (tuple_found) { - // Update the decompressor depending on the compression type of the file in the - // context. - DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY) - << "FE should have generated SNAPPY_BLOCKED instead."; - RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression)); - - // Process the scan range. - int dummy_num_tuples; - RETURN_IF_ERROR(ProcessRange(&dummy_num_tuples, false)); - - // Finish up reading past the scan range. - RETURN_IF_ERROR(FinishScanRange()); - } - - // All done with this scan range. + HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_); + do { + batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); + RETURN_IF_ERROR(GetNextInternal(batch_)); + scan_node->AddMaterializedRowBatch(batch_); + } while (!eos_ && !scan_node_->ReachedLimit()); + + // Transfer the remaining resources to this new batch in Close(). + batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); return Status::OK(); } void HdfsTextScanner::Close(RowBatch* row_batch) { - // Need to close the decompressor before releasing the resources at AddFinalRowBatch(), - // because in some cases there is memory allocated in decompressor_'s temp_memory_pool_. - if (decompressor_.get() != NULL) { + // Need to close the decompressor before transferring the remaining resources to + // 'row_batch' because in some cases there is memory allocated in the decompressor_'s + // temp_memory_pool_. + if (decompressor_ != nullptr) { decompressor_->Close(); - decompressor_.reset(NULL); + decompressor_.reset(); } - if (row_batch != NULL) { + if (row_batch != nullptr) { row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false); @@ -194,10 +185,13 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); } } else { - if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll(); + if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll(); + if (data_buffer_pool_ != nullptr) data_buffer_pool_->FreeAll(); + if (boundary_pool_ != nullptr) boundary_pool_->FreeAll(); + context_->ReleaseCompletedResources(nullptr, true); } - // Verify all resources (if any) have been transferred. + // Verify all resources (if any) have been transferred or freed. DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0); @@ -210,12 +204,18 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { } Status HdfsTextScanner::InitNewRange() { + DCHECK_EQ(scan_state_, CONSTRUCTED); // Compressed text does not reference data in the io buffers directly. In such case, we // can recycle the buffers in the stream_ more promptly. if (stream_->file_desc()->file_compression != THdfsCompression::NONE) { stream_->set_contains_tuple_data(false); } + // Update the decompressor based on the compression type of the file in the context. + DCHECK(stream_->file_desc()->file_compression != THdfsCompression::SNAPPY) + << "FE should have generated SNAPPY_BLOCKED instead."; + RETURN_IF_ERROR(UpdateDecompressor(stream_->file_desc()->file_compression)); + HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor(); char field_delim = hdfs_partition->field_delim(); char collection_delim = hdfs_partition->collection_delim(); @@ -233,38 +233,34 @@ Status HdfsTextScanner::InitNewRange() { state_->strict_mode())); RETURN_IF_ERROR(ResetScanner()); + scan_state_ = SCAN_RANGE_INITIALIZED; return Status::OK(); } Status HdfsTextScanner::ResetScanner() { - error_in_row_ = false; - - // Note - this initialisation relies on the assumption that N partition keys will occupy - // entries 0 through N-1 in column_idx_to_slot_idx. If this changes, we will need - // another layer of indirection to map text-file column indexes onto the - // column_idx_to_slot_idx table used below. + // Assumes that N partition keys occupy entries 0 through N-1 in materialized_slots_. + // If this changes, we will need another layer of indirection to map text-file column + // indexes to their destination slot. slot_idx_ = 0; + error_in_row_ = false; boundary_column_.Clear(); boundary_row_.Clear(); delimited_text_parser_->ParserReset(); - + byte_buffer_ptr_ = byte_buffer_end_ = nullptr; + partial_tuple_ = Tuple::Create(tuple_byte_size_, boundary_pool_.get()); partial_tuple_empty_ = true; - byte_buffer_ptr_ = byte_buffer_end_ = NULL; - - partial_tuple_ = - Tuple::Create(scan_node_->tuple_desc()->byte_size(), boundary_pool_.get()); // Initialize codegen fn RETURN_IF_ERROR(InitializeWriteTuplesFn( - context_->partition_descriptor(), THdfsFileFormat::TEXT, "HdfsTextScanner")); + context_->partition_descriptor(), THdfsFileFormat::TEXT, "HdfsTextScanner")); return Status::OK(); } -Status HdfsTextScanner::FinishScanRange() { - if (scan_node_->ReachedLimit()) return Status::OK(); - +Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) { + DCHECK(!row_batch->AtCapacity()); DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_); + bool split_delimiter; RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter)); if (split_delimiter) { @@ -275,11 +271,13 @@ Status HdfsTextScanner::FinishScanRange() { DCHECK(partial_tuple_empty_); DCHECK(boundary_column_.IsEmpty()); DCHECK(boundary_row_.IsEmpty()); + scan_state_ = DONE; return Status::OK(); } // For text we always need to scan past the scan range to find the next delimiter while (true) { + DCHECK_EQ(scan_state_, PAST_SCAN_RANGE); bool eosr = true; Status status = Status::OK(); byte_buffer_read_size_ = 0; @@ -287,8 +285,8 @@ Status HdfsTextScanner::FinishScanRange() { // If compressed text, then there is nothing more to be read. // TODO: calling FillByteBuffer() at eof() can cause // ScannerContext::Stream::GetNextBuffer to DCHECK. Fix this. - if (decompressor_.get() == NULL && !stream_->eof()) { - status = FillByteBuffer(&eosr, NEXT_BLOCK_READ_SIZE); + if (decompressor_.get() == nullptr && !stream_->eof()) { + status = FillByteBuffer(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE); } if (!status.ok() || byte_buffer_read_size_ == 0) { @@ -311,25 +309,25 @@ Status HdfsTextScanner::FinishScanRange() { RETURN_IF_ERROR(delimited_text_parser_->FillColumns<true>(boundary_column_.len(), &col, &num_fields, &field_locations_[0])); - MemPool* pool; - TupleRow* tuple_row_mem; - int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem); + TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow()); + int max_tuples = row_batch->capacity() - row_batch->num_rows(); DCHECK_GE(max_tuples, 1); // Set variables for proper error outputting on boundary tuple batch_start_ptr_ = boundary_row_.buffer(); row_end_locations_[0] = batch_start_ptr_ + boundary_row_.len(); - int num_tuples = WriteFields(pool, tuple_row_mem, num_fields, 1); + int num_tuples = + WriteFields(num_fields, 1, row_batch->tuple_data_pool(), tuple_row_mem); DCHECK_LE(num_tuples, 1); DCHECK_GE(num_tuples, 0); COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples); - RETURN_IF_ERROR(CommitRows(num_tuples)); + RETURN_IF_ERROR(CommitRows(num_tuples, false, row_batch)); } else if (delimited_text_parser_->HasUnfinishedTuple()) { DCHECK(scan_node_->materialized_slots().empty()); DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0); // If no fields are materialized we do not update partial_tuple_empty_, // boundary_column_, or boundary_row_. However, we still need to handle the case // of partial tuple due to missing tuple delimiter at the end of file. - RETURN_IF_ERROR(CommitRows(1)); + RETURN_IF_ERROR(CommitRows(1, false, row_batch)); } break; } @@ -337,27 +335,29 @@ Status HdfsTextScanner::FinishScanRange() { DCHECK(eosr); int num_tuples; - RETURN_IF_ERROR(ProcessRange(&num_tuples, true)); + RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples)); if (num_tuples == 1) break; DCHECK_EQ(num_tuples, 0); } + scan_state_ = DONE; return Status::OK(); } -Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { - bool eosr = past_scan_range || stream_->eosr(); +Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) { + DCHECK(scan_state_ == FIRST_TUPLE_FOUND || scan_state_ == PAST_SCAN_RANGE); + MemPool* pool = row_batch->tuple_data_pool(); + bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE; while (true) { if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) { - RETURN_IF_ERROR(FillByteBuffer(&eosr)); + RETURN_IF_ERROR(FillByteBuffer(pool, &eosr)); } - MemPool* pool; - TupleRow* tuple_row_mem; - int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem); + TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow()); + int max_tuples = row_batch->capacity() - row_batch->num_rows(); - if (past_scan_range) { + if (scan_state_ == PAST_SCAN_RANGE) { // byte_buffer_ptr_ is already set from FinishScanRange() max_tuples = 1; eosr = true; @@ -389,7 +389,7 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { RETURN_IF_ERROR(CopyBoundaryField(&field_locations_[0], pool)); boundary_column_.Clear(); } - num_tuples_materialized = WriteFields(pool, tuple_row_mem, num_fields, *num_tuples); + num_tuples_materialized = WriteFields(num_fields, *num_tuples, pool, tuple_row_mem); DCHECK_GE(num_tuples_materialized, 0); RETURN_IF_ERROR(parse_status_); if (*num_tuples > 0) { @@ -402,12 +402,13 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { boundary_row_.Clear(); num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples); } + COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples); // Save contents that are split across buffers if we are going to return this column if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) { DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_); RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start)); - char* last_row = NULL; + char* last_row = nullptr; if (*num_tuples == 0) { last_row = batch_start_ptr_; } else { @@ -415,25 +416,65 @@ Status HdfsTextScanner::ProcessRange(int* num_tuples, bool past_scan_range) { } RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row)); } - COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples); + RETURN_IF_ERROR(CommitRows(num_tuples_materialized, false, row_batch)); - // Commit the rows to the row batch and scan node - RETURN_IF_ERROR(CommitRows(num_tuples_materialized)); + // Already past the scan range and attempting to complete the last row. + if (scan_state_ == PAST_SCAN_RANGE) break; - // Done with this buffer and the scan range - if ((byte_buffer_ptr_ == byte_buffer_end_ && eosr) || past_scan_range) { + // Scan range is done. Transition to PAST_SCAN_RANGE. + if (byte_buffer_ptr_ == byte_buffer_end_ && eosr) { + scan_state_ = PAST_SCAN_RANGE; break; } - if (scan_node_->ReachedLimit()) return Status::OK(); + if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break; } return Status::OK(); } -Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) { +Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) { + DCHECK(!eos_); + DCHECK_GE(scan_state_, SCAN_RANGE_INITIALIZED); + DCHECK_NE(scan_state_, DONE); + + if (scan_state_ == SCAN_RANGE_INITIALIZED) { + // Find the first tuple. If tuple_found is false, it means we went through the entire + // scan range without finding a single tuple. The bytes will be picked up by the scan + // range before. + RETURN_IF_ERROR(FindFirstTuple(row_batch->tuple_data_pool())); + if (scan_state_ != FIRST_TUPLE_FOUND) { + eos_ = true; + scan_state_ = DONE; + return Status::OK(); + } + } + + int64_t tuple_buffer_size; + RETURN_IF_ERROR( + row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_)); + tuple_ = reinterpret_cast<Tuple*>(tuple_mem_); + + if (scan_state_ == FIRST_TUPLE_FOUND) { + int num_tuples; + RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples)); + } + if (scan_node_->ReachedLimit()) { + eos_ = true; + scan_state_ = DONE; + return Status::OK(); + } + if (scan_state_ == PAST_SCAN_RANGE && !row_batch->AtCapacity()) { + RETURN_IF_ERROR(FinishScanRange(row_batch)); + DCHECK_EQ(scan_state_, DONE); + eos_ = true; + } + return Status::OK(); +} + +Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes) { *eosr = false; - if (decompressor_.get() == NULL) { + if (decompressor_.get() == nullptr) { Status status; if (num_bytes > 0) { stream_->GetBytes(num_bytes, reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), @@ -447,7 +488,7 @@ Status HdfsTextScanner::FillByteBuffer(bool* eosr, int num_bytes) { *eosr = stream_->eosr(); } else if (decompressor_->supports_streaming()) { DCHECK_EQ(num_bytes, 0); - RETURN_IF_ERROR(FillByteBufferCompressedStream(eosr)); + RETURN_IF_ERROR(FillByteBufferCompressedStream(pool, eosr)); } else { DCHECK_EQ(num_bytes, 0); RETURN_IF_ERROR(FillByteBufferCompressedFile(eosr)); @@ -463,7 +504,7 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t bytes_to_read, // decompress buffers that are read from stream_, so we don't need to read the // whole file in once. A compressed buffer is passed to ProcessBlockStreaming // but it may not consume all of the input. - uint8_t* compressed_buffer_ptr = NULL; + uint8_t* compressed_buffer_ptr = nullptr; int64_t compressed_buffer_size = 0; // We don't know how many bytes ProcessBlockStreaming() will consume so we set // peek=true and then later advance the stream using SkipBytes(). @@ -512,15 +553,18 @@ Status HdfsTextScanner::DecompressBufferStream(int64_t bytes_to_read, return Status::OK(); } -Status HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) { - // We're about to create a new decompression buffer (if we can't reuse). It's now - // safe to attach the current decompression buffer to batch_ because we know that - // it's the last row-batch that can possibly reference this buffer. +Status HdfsTextScanner::FillByteBufferCompressedStream(MemPool* pool, bool* eosr) { + // We're about to create a new decompression buffer (if we can't reuse). Attach the + // memory from previous decompression rounds to 'pool'. if (!decompressor_->reuse_output_buffer()) { - RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), false)); + if (pool != nullptr) { + pool->AcquireData(data_buffer_pool_.get(), false); + } else { + data_buffer_pool_->FreeAll(); + } } - uint8_t* decompressed_buffer = NULL; + uint8_t* decompressed_buffer = nullptr; int64_t decompressed_len = 0; // Set bytes_to_read = -1 because we don't know how much data decompressor need. // Just read the first available buffer within the scan range. @@ -543,7 +587,7 @@ Status HdfsTextScanner::FillByteBufferCompressedStream(bool* eosr) { if (*eosr) { DCHECK(stream_->eosr()); - context_->ReleaseCompletedResources(NULL, true); + context_->ReleaseCompletedResources(nullptr, true); } return Status::OK(); @@ -579,7 +623,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { // Decompress and adjust the byte_buffer_ptr_ and byte_buffer_read_size_ accordingly. int64_t decompressed_len = 0; - uint8_t* decompressed_buffer = NULL; + uint8_t* decompressed_buffer = nullptr; SCOPED_TIMER(decompress_timer_); // TODO: Once the writers are in, add tests with very large compressed files (4GB) // that could overflow. @@ -588,7 +632,7 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { &decompressed_buffer)); // Inform 'stream_' that the buffer with the compressed text can be released. - context_->ReleaseCompletedResources(NULL, true); + context_->ReleaseCompletedResources(nullptr, true); VLOG_FILE << "Decompressed " << byte_buffer_read_size_ << " to " << decompressed_len; byte_buffer_ptr_ = reinterpret_cast<char*>(decompressed_buffer); @@ -597,20 +641,22 @@ Status HdfsTextScanner::FillByteBufferCompressedFile(bool* eosr) { return Status::OK(); } -Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { - *tuple_found = true; +Status HdfsTextScanner::FindFirstTuple(MemPool* pool) { + DCHECK_EQ(scan_state_, SCAN_RANGE_INITIALIZED); + // Either we're at the start of the file and thus skip all header lines, or we're in the // middle of the file and look for the next tuple. + bool tuple_found = true; int num_rows_to_skip = stream_->scan_range()->offset() == 0 ? scan_node_->skip_header_line_count() : 1; if (num_rows_to_skip > 0) { int num_skipped_rows = 0; - *tuple_found = false; bool eosr = false; + tuple_found = false; // Offset maybe not point to a tuple boundary, skip ahead to the first tuple start in // this scan range (if one exists). do { - RETURN_IF_ERROR(FillByteBuffer(&eosr)); + RETURN_IF_ERROR(FillByteBuffer(nullptr, &eosr)); delimited_text_parser_->ParserReset(); SCOPED_TIMER(parse_delimiter_timer_); @@ -624,23 +670,23 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { bytes_left -= next_tuple_offset; ++num_skipped_rows; } - if (next_tuple_offset != -1) *tuple_found = true; - } while (!*tuple_found && !eosr); + if (next_tuple_offset != -1) tuple_found = true; + } while (!tuple_found && !eosr); // Special case: if the first delimiter is at the end of the current buffer, it's // possible it's a split "\r\n" delimiter. - if (*tuple_found && byte_buffer_ptr_ == byte_buffer_end_) { + if (tuple_found && byte_buffer_ptr_ == byte_buffer_end_) { bool split_delimiter; RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter)); if (split_delimiter) { if (eosr) { // Split delimiter at the end of the scan range. The next tuple is considered // part of the next scan range, so we report no tuple found. - *tuple_found = false; + tuple_found = false; } else { // Split delimiter at the end of the current buffer, but not eosr. Advance to // the correct position in the next buffer. - RETURN_IF_ERROR(FillByteBuffer(&eosr)); + RETURN_IF_ERROR(FillByteBuffer(pool, &eosr)); DCHECK_GT(byte_buffer_read_size_, 0); DCHECK_EQ(*byte_buffer_ptr_, '\n'); byte_buffer_ptr_ += 1; @@ -648,7 +694,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { } } if (num_rows_to_skip > 1 && num_skipped_rows != num_rows_to_skip) { - DCHECK(!*tuple_found); + DCHECK(!tuple_found); stringstream ss; ss << "Could only skip " << num_skipped_rows << " header lines in first scan range " << "but expected " << num_rows_to_skip << ". Try increasing " @@ -656,6 +702,7 @@ Status HdfsTextScanner::FindFirstTuple(bool* tuple_found) { return Status(ss.str()); } } + if (tuple_found) scan_state_ = FIRST_TUPLE_FOUND; DCHECK(delimited_text_parser_->AtTupleStart()); return Status::OK(); } @@ -695,17 +742,17 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { // is then injected into the cross-compiled driving function, WriteAlignedTuples(). Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node, const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { - *write_aligned_tuples_fn = NULL; + *write_aligned_tuples_fn = nullptr; DCHECK(node->runtime_state()->ShouldCodegen()); LlvmCodeGen* codegen = node->runtime_state()->codegen(); - DCHECK(codegen != NULL); + DCHECK(codegen != nullptr); Function* write_complete_tuple_fn; RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjunct_ctxs, &write_complete_tuple_fn)); - DCHECK(write_complete_tuple_fn != NULL); + DCHECK(write_complete_tuple_fn != nullptr); RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, write_aligned_tuples_fn)); - DCHECK(*write_aligned_tuples_fn != NULL); + DCHECK(*write_aligned_tuples_fn != nullptr); return Status::OK(); } @@ -721,22 +768,21 @@ Status HdfsTextScanner::Open(ScannerContext* context) { field_locations_.resize(state_->batch_size() * scan_node_->materialized_slots().size()); row_end_locations_.resize(state_->batch_size()); - // Allocate a new row batch. May fail if mem limit is exceeded. - RETURN_IF_ERROR(StartNewRowBatch()); + // Reset state for new scan range + RETURN_IF_ERROR(InitNewRange()); return Status::OK(); } -// This function writes fields in 'field_locations_' to the row_batch. This function -// deals with tuples that straddle batches. There are two cases: +// This function deals with tuples that straddle batches. There are two cases: // 1. There is already a partial tuple in flight from the previous time around. -// This tuple can either be fully materialized (all the materialized columns have -// been processed but we haven't seen the tuple delimiter yet) or only partially -// materialized. In this case num_tuples can be greater than num_fields +// This tuple can either be fully materialized (all the materialized columns have +// been processed but we haven't seen the tuple delimiter yet) or only partially +// materialized. In this case num_tuples can be greater than num_fields // 2. There is a non-fully materialized tuple at the end. The cols that have been -// parsed so far are written to 'tuple_' and the remained will be picked up (case 1) -// the next time around. -int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, - int num_fields, int num_tuples) { +// parsed so far are written to 'tuple_' and the remaining ones will be picked up +// (case 1) the next time around. +int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool, + TupleRow* row) { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); FieldLocation* fields = &field_locations_[0]; @@ -745,7 +791,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, int num_tuples_materialized = 0; // Write remaining fields, if any, from the previous partial tuple. if (slot_idx_ != 0) { - DCHECK(tuple_ != NULL); + DCHECK(tuple_ != nullptr); int num_partial_fields = scan_node_->materialized_slots().size() - slot_idx_; // Corner case where there will be no materialized tuples but at least one col // worth of string data. In this case, make a deep copy and reuse the byte buffer. @@ -770,16 +816,16 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, memcpy(tuple_, partial_tuple_, scan_node_->tuple_desc()->byte_size()); partial_tuple_empty_ = true; - tuple_row->SetTuple(scan_node_->tuple_idx(), tuple_); + row->SetTuple(scan_node_->tuple_idx(), tuple_); slot_idx_ = 0; ++num_tuples_processed; --num_tuples; - if (EvalConjuncts(tuple_row)) { + if (EvalConjuncts(row)) { ++num_tuples_materialized; tuple_ = next_tuple(tuple_byte_size_, tuple_); - tuple_row = next_row(tuple_row); + row = next_row(row); } } @@ -793,18 +839,18 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, num_tuples : scan_node_->limit() - scan_node_->rows_returned(); int tuples_returned = 0; // Call jitted function if possible - if (write_tuples_fn_ != NULL) { + if (write_tuples_fn_ != nullptr) { // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string // slots and escape characters. TextConverter::WriteSlot() will be used instead. DCHECK(scan_node_->tuple_desc()->string_slots().empty() || delimited_text_parser_->escape_char() == '\0'); - tuples_returned = write_tuples_fn_(this, pool, tuple_row, - batch_->row_byte_size(), fields, num_tuples, max_added_tuples, - scan_node_->materialized_slots().size(), num_tuples_processed); + tuples_returned = write_tuples_fn_(this, pool, row, sizeof(Tuple*), fields, + num_tuples, max_added_tuples, scan_node_->materialized_slots().size(), + num_tuples_processed); } else { - tuples_returned = WriteAlignedTuples(pool, tuple_row, - batch_->row_byte_size(), fields, num_tuples, max_added_tuples, - scan_node_->materialized_slots().size(), num_tuples_processed); + tuples_returned = WriteAlignedTuples(pool, row, sizeof(Tuple*), fields, + num_tuples, max_added_tuples, scan_node_->materialized_slots().size(), + num_tuples_processed); } if (tuples_returned == -1) return 0; DCHECK_EQ(slot_idx_, 0); @@ -819,7 +865,7 @@ int HdfsTextScanner::WriteFields(MemPool* pool, TupleRow* tuple_row, // Write out the remaining slots (resulting in a partially materialized tuple) if (num_fields != 0) { - DCHECK(tuple_ != NULL); + DCHECK(tuple_ != nullptr); InitTuple(template_tuple_, partial_tuple_); // If there have been no materialized tuples at this point, copy string data // out of byte_buffer and reuse the byte_buffer. The copied data can be at @@ -836,7 +882,7 @@ Status HdfsTextScanner::CopyBoundaryField(FieldLocation* data, MemPool* pool) { int copy_len = needs_escape ? -data->len : data->len; int64_t total_len = copy_len + boundary_column_.len(); char* str_data = reinterpret_cast<char*>(pool->TryAllocate(total_len)); - if (UNLIKELY(str_data == NULL)) { + if (UNLIKELY(str_data == nullptr)) { string details = Substitute("HdfsTextScanner::CopyBoundaryField() failed to allocate " "$0 bytes.", total_len); return pool->mem_tracker()->MemLimitExceeded(state_, details, total_len); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index e68d45f..042cd18 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -36,7 +36,7 @@ struct HdfsFileDesc; /// This scanner handles text files split across multiple blocks/scan ranges. Note that /// the split can occur anywhere in the file, e.g. in the middle of a row. Each scanner /// starts materializing tuples right after the first row delimiter found in the scan -/// range, and stops at the first row delimiter occuring past the end of the scan +/// range, and stops at the first row delimiter occurring past the end of the scan /// range. If no delimiter is found in the scan range, the scanner doesn't materialize /// anything. This scheme ensures that every row is materialized by exactly one scanner. /// @@ -57,10 +57,10 @@ class HdfsTextScanner : public HdfsScanner { /// Issue io manager byte ranges for 'files'. static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, - const std::vector<HdfsFileDesc*>& files); + const std::vector<HdfsFileDesc*>& files); /// Codegen WriteAlignedTuples(). Stores the resulting function in - /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. + /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise. static Status Codegen(HdfsScanNodeBase* node, const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** write_aligned_tuples_fn); @@ -71,6 +71,8 @@ class HdfsTextScanner : public HdfsScanner { static const char* LLVM_CLASS_NAME; protected: + virtual Status GetNextInternal(RowBatch* row_batch); + /// Reset the scanner. This clears any partial state that needs to /// be cleared when starting or when restarting after an error. Status ResetScanner(); @@ -90,33 +92,52 @@ class HdfsTextScanner : public HdfsScanner { private: const static int NEXT_BLOCK_READ_SIZE = 64 * 1024; //bytes + /// The text scanner transitions through these states exactly in order. + enum TextScanState { + CONSTRUCTED, + SCAN_RANGE_INITIALIZED, + FIRST_TUPLE_FOUND, + PAST_SCAN_RANGE, + DONE + }; + /// Initializes this scanner for this context. The context maps to a single - /// scan range. + /// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED. virtual Status InitNewRange(); /// Finds the start of the first tuple in this scan range and initializes - /// byte_buffer_ptr to be the next character (the start of the first tuple). If - /// there are no tuples starts in the entire range, *tuple_found is set to false - /// and no more processing needs to be done in this range (i.e. there are really large - /// columns) - Status FindFirstTuple(bool* tuple_found); - - /// Process the entire scan range, reading bytes from context and appending - /// materialized row batches to the scan node. *num_tuples returns the - /// number of tuples parsed. past_scan_range is true if this is processing - /// beyond the end of the scan range and this function should stop after - /// finding one tuple. - Status ProcessRange(int* num_tuples, bool past_scan_range); - - /// Reads past the end of the scan range for the next tuple end. - Status FinishScanRange(); + /// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the scan state + /// to FIRST_TUPLE_FOUND, if successful. Otherwise, consumes the whole scan range + /// and does not update the scan state (e.g. if there are really large columns). + /// Only valid to call in scan state SCAN_RANGE_INITIALIZED. + Status FindFirstTuple(MemPool* pool); + + /// When in scan state FIRST_TUPLE_FOUND, starts or continues processing the scan range + /// by reading bytes from 'context_'. Adds materialized tuples that pass the conjuncts + /// to 'row_batch', and returns when 'row_batch' is at capacity. + /// When in scan state PAST_SCAN_RANGE, this function returns after parsing one tuple, + /// regardless of whether it passed the conjuncts. + /// *num_tuples returns the total number of tuples parsed, including tuples that did + /// not pass conjuncts. + /// Advances the scan state to PAST_SCAN_RANGE if all bytes in the scan range have been + /// processed. + /// Only valid to call in scan state FIRST_TUPLE_FOUND or PAST_SCAN_RANGE. + Status ProcessRange(RowBatch* row_batch, int* num_tuples); + + /// Reads past the end of the scan range for the next tuple end. If successful, + /// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE. + Status FinishScanRange(RowBatch* row_batch); /// Fills the next byte buffer from the context. This will block if there are no bytes /// ready. Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_. /// If num_bytes is 0, the scanner will read whatever is the io mgr buffer size, /// otherwise it will just read num_bytes. If we are reading compressed text, num_bytes - /// must be 0. - virtual Status FillByteBuffer(bool* eosr, int num_bytes = 0); + /// must be 0. Internally, calls the appropriate streaming or non-streaming + /// decompression functions FillByteBufferCompressedFile/Stream(). + /// If applicable, attaches decompression buffers from previous calls that might still + /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are + /// freed instead. + virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0); /// Fills the next byte buffer from the compressed data in stream_ by reading the entire /// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer. @@ -126,7 +147,9 @@ class HdfsTextScanner : public HdfsScanner { /// FillByteBufferCompressedFile(), the entire file does not need to be read at once. /// Buffers from stream_ are decompressed as they are read and byte_buffer_ptr_ is set /// to available decompressed data. - Status FillByteBufferCompressedStream(bool* eosr); + /// Attaches decompression buffers from previous calls that might still be referenced + /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead. + Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr); /// Used by FillByteBufferCompressedStream() to decompress data from 'stream_'. /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input. @@ -150,21 +173,24 @@ class HdfsTextScanner : public HdfsScanner { /// memory limit is exceeded when allocating a new string. Status CopyBoundaryField(FieldLocation* data, MemPool* pool); - /// Writes the intermediate parsed data into slots, outputting - /// tuples to row_batch as they complete. + /// Writes intermediate parsed data into 'tuple_', evaluates conjuncts, and appends + /// surviving rows to 'row'. Advances 'tuple_' and 'row' as necessary. /// Input Parameters: - /// mempool: MemPool to allocate from for field data /// num_fields: Total number of fields contained in parsed_data_ /// num_tuples: Number of tuples in parsed_data_. This includes the potential /// partial tuple at the beginning of 'field_locations_'. - /// Returns the number of tuples added to the row batch. - int WriteFields(MemPool*, TupleRow* tuple_row_mem, int num_fields, int num_tuples); + /// pool: MemPool to allocate from for field data + /// Returns the number of rows added to the row batch. + int WriteFields(int num_fields, int num_tuples, MemPool* pool, TupleRow* row); /// Utility function to write out 'num_fields' to 'tuple_'. This is used to parse /// partial tuples. If copy_strings is true, strings from fields will be copied into /// the boundary pool. void WritePartialTuple(FieldLocation*, int num_fields, bool copy_strings); + /// Current state of this scanner. Advances through the states exactly in order. + TextScanState scan_state_; + /// Mem pool for boundary_row_ and boundary_column_. boost::scoped_ptr<MemPool> boundary_pool_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 66f112d..335c921 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -92,7 +92,7 @@ ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { } void ScannerContext::Stream::ReleaseCompletedResources(RowBatch* batch, bool done) { - DCHECK(batch != nullptr || done); + DCHECK(batch != nullptr || done || !contains_tuple_data_); if (done) { // Mark any pending resources as completed if (io_buffer_ != nullptr) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 0f4e36f..ff3cfa8 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -257,8 +257,9 @@ class ScannerContext { /// If 'batch' is not NULL and 'contains_tuple_data_' is true, attaches all completed /// io buffers and the boundary mem pool to 'batch'. If 'done' is set, all in-flight /// resources are also attached or released. - /// If 'batch' is NULL then 'done' must be true. Such a call will release all - /// completed and in-flight resources. + /// If 'batch' is NULL then 'done' must be true or 'contains_tuple_data_' false. Such + /// a call will release all completed resources. If 'done' is true all in-flight + /// resources are also freed. void ReleaseCompletedResources(RowBatch* batch, bool done); /// Error-reporting functions. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/be8d1512/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 50d6bfe..cb6627f 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -221,7 +221,9 @@ public class HdfsScanNode extends ScanNode { // is currently only supported for Parquet. if (analyzer.getQueryOptions().isSetMt_dop() && analyzer.getQueryOptions().mt_dop > 0 && - fileFormats.size() == 1 && fileFormats.contains(HdfsFileFormat.PARQUET)) { + fileFormats.size() == 1 && + (fileFormats.contains(HdfsFileFormat.PARQUET) + || fileFormats.contains(HdfsFileFormat.TEXT))) { useMtScanNode_ = true; } else { useMtScanNode_ = false;
