http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 7af5d5d..11e5718 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -231,15 +231,15 @@ class HdfsScanNodeBase : public ScanNode { Tuple* InitTemplateTuple(const std::vector<ScalarExprEvaluator*>& value_evals, MemPool* pool, RuntimeState* state) const; - /// Returns the file desc for 'filename'. Returns NULL if filename is invalid. + /// Returns the file desc for 'filename'. Returns nullptr if filename is invalid. HdfsFileDesc* GetFileDesc(const std::string& filename); /// Sets the scanner specific metadata for 'filename'. Scanners can use this to store /// file header information. Thread safe. void SetFileMetadata(const std::string& filename, void* metadata); - /// Returns the scanner specific metadata for 'filename'. Returns NULL if there is no - /// metadata. Thread safe. + /// Returns the scanner specific metadata for 'filename'. Returns nullptr if there is + /// no metadata. Thread safe. void* GetFileMetadata(const std::string& filename); /// Called by scanners when a range is complete. Used to record progress. @@ -267,6 +267,9 @@ class HdfsScanNodeBase : public ScanNode { return materialized_slots().empty() && tuple_desc()->tuple_path().empty(); } + /// Transfers all memory from 'pool' to 'scan_node_pool_'. + virtual void TransferToScanNodePool(MemPool* pool); + /// map from volume id to <number of split, per volume split lengths> /// TODO: move this into some global .h, no need to include this file just for this /// typedef @@ -329,7 +332,7 @@ class HdfsScanNodeBase : public ScanNode { /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only /// the partition columns for that partition materialized. Used to filter files and scan - /// ranges on partition-column filters. Populated in Prepare(). + /// ranges on partition-column filters. Populated in Open(). boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_; /// Descriptor for the hdfs table, including partition and format metadata.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node-mt.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h index 7829d47..4ce12fe 100644 --- a/be/src/exec/hdfs-scan-node-mt.h +++ b/be/src/exec/hdfs-scan-node-mt.h @@ -40,9 +40,10 @@ class HdfsScanNodeMt : public HdfsScanNodeBase { HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~HdfsScanNodeMt(); - virtual Status Prepare(RuntimeState* state); - virtual Status Open(RuntimeState* state); - virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); + virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) + WARN_UNUSED_RESULT; virtual void Close(RuntimeState* state); virtual bool HasRowBatchQueue() const { return false; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 4cbf503..1ab87ac 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -17,6 +17,7 @@ #include "exec/hdfs-scan-node.h" +#include <memory> #include <sstream> #include "common/logging.h" @@ -111,10 +112,10 @@ Status HdfsScanNode::GetNextInternal( return Status::OK(); } *eos = false; - RowBatch* materialized_batch = materialized_row_batches_->GetBatch(); + unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch(); if (materialized_batch != NULL) { num_owned_io_buffers_.Add(-materialized_batch->num_io_buffers()); - row_batch->AcquireState(materialized_batch); + row_batch->AcquireState(materialized_batch.get()); // Update the number of materialized rows now instead of when they are materialized. // This means that scanners might process and queue up more rows than are necessary // for the limit case but we want to avoid the synchronized writes to @@ -132,7 +133,7 @@ Status HdfsScanNode::GetNextInternal( SetDone(); } DCHECK_EQ(materialized_batch->num_io_buffers(), 0); - delete materialized_batch; + materialized_batch.reset(); return Status::OK(); } // The RowBatchQueue was shutdown either because all scan ranges are complete or a @@ -248,12 +249,12 @@ void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, void HdfsScanNode::TransferToScanNodePool(MemPool* pool) { unique_lock<mutex> l(lock_); - scan_node_pool_->AcquireData(pool, false); + HdfsScanNodeBase::TransferToScanNodePool(pool); } -void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) { - InitNullCollectionValues(row_batch); - materialized_row_batches_->AddBatch(row_batch); +void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) { + InitNullCollectionValues(row_batch.get()); + materialized_row_batches_->AddBatch(move(row_batch)); } Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges, @@ -541,9 +542,8 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, VLOG_QUERY << ss.str(); } - // Transfer the remaining resources to the final row batch (if any) and add it to - // the row batch queue. - scanner->Close(scanner->batch()); + // Transfer remaining resources to a final batch and add it to the row batch queue. + scanner->Close(); return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 41647da..b056e2e 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -20,6 +20,7 @@ #define IMPALA_EXEC_HDFS_SCAN_NODE_H_ #include <map> +#include <memory> #include <stdint.h> #include <vector> @@ -66,10 +67,11 @@ class HdfsScanNode : public HdfsScanNodeBase { HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); ~HdfsScanNode(); - virtual Status Init(const TPlanNode& tnode, RuntimeState* state); - virtual Status Prepare(RuntimeState* state); - virtual Status Open(RuntimeState* state); - virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); + virtual Status Init(const TPlanNode& tnode, RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status Open(RuntimeState* state) WARN_UNUSED_RESULT; + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) + WARN_UNUSED_RESULT; virtual void Close(RuntimeState* state); virtual bool HasRowBatchQueue() const { return true; } @@ -78,12 +80,12 @@ class HdfsScanNode : public HdfsScanNodeBase { /// Adds ranges to the io mgr queue and starts up new scanner threads if possible. virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, - int num_files_queued); + int num_files_queued) WARN_UNUSED_RESULT; /// Adds a materialized row batch for the scan node. This is called from scanner /// threads. /// This function will block if materialized_row_batches_ is full. - void AddMaterializedRowBatch(RowBatch* row_batch); + void AddMaterializedRowBatch(std::unique_ptr<RowBatch> row_batch); /// Called by scanners when a range is complete. Used to record progress and set done_. /// This *must* only be called after a scanner has completely finished its @@ -93,8 +95,8 @@ class HdfsScanNode : public HdfsScanNodeBase { virtual void RangeComplete(const THdfsFileFormat::type& file_type, const std::vector<THdfsCompression::type>& compression_type); - /// Acquires all allocations from pool into scan_node_pool_. Thread-safe. - void TransferToScanNodePool(MemPool* pool); + /// Transfers all memory from 'pool' to 'scan_node_pool_'. + virtual void TransferToScanNodePool(MemPool* pool); private: /// Released when initial ranges are issued in the first call to GetNext(). @@ -164,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase { /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows /// in this split. Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs, - DiskIoMgr::ScanRange* scan_range); + DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT; /// Returns true if there is enough memory (against the mem tracker limits) to /// have a scanner thread. @@ -175,7 +177,8 @@ class HdfsScanNode : public HdfsScanNodeBase { bool EnoughMemoryForScannerThread(bool new_thread); /// Checks for eos conditions and returns batches from materialized_row_batches_. - Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); + Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos) + WARN_UNUSED_RESULT; /// sets done_ to true and triggers threads to cleanup. Cannot be called with /// any locks taken. Calling it repeatedly ignores subsequent calls. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 991c40f..0670f81 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -60,44 +60,16 @@ const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner"; HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : scan_node_(scan_node), state_(state), - context_(NULL), - stream_(NULL), - eos_(false), - is_closed_(false), expr_mem_pool_(new MemPool(scan_node->expr_mem_tracker())), - conjunct_evals_(NULL), template_tuple_pool_(new MemPool(scan_node->mem_tracker())), - template_tuple_(NULL), tuple_byte_size_(scan_node->tuple_desc()->byte_size()), - tuple_(NULL), - batch_(NULL), - tuple_mem_(NULL), - parse_status_(Status::OK()), - decompression_type_(THdfsCompression::NONE), - data_buffer_pool_(new MemPool(scan_node->mem_tracker())), - decompress_timer_(NULL), - write_tuples_fn_(NULL) { + data_buffer_pool_(new MemPool(scan_node->mem_tracker())) { } HdfsScanner::HdfsScanner() - : scan_node_(NULL), - state_(NULL), - context_(NULL), - stream_(NULL), - eos_(false), - is_closed_(false), - conjunct_evals_(NULL), - template_tuple_pool_(NULL), - template_tuple_(NULL), - tuple_byte_size_(-1), - tuple_(NULL), - batch_(NULL), - tuple_mem_(NULL), - parse_status_(Status::OK()), - decompression_type_(THdfsCompression::NONE), - data_buffer_pool_(NULL), - decompress_timer_(NULL), - write_tuples_fn_(NULL) { + : scan_node_(nullptr), + state_(nullptr), + tuple_byte_size_(0) { DCHECK(TestInfo::is_test()); } @@ -142,9 +114,31 @@ Status HdfsScanner::Open(ScannerContext* context) { return Status::OK(); } -void HdfsScanner::Close(RowBatch* row_batch) { +Status HdfsScanner::ProcessSplit() { + DCHECK(scan_node_->HasRowBatchQueue()); + HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_); + do { + unique_ptr<RowBatch> batch = std::make_unique<RowBatch>(scan_node_->row_desc(), + state_->batch_size(), scan_node_->mem_tracker()); + Status status = GetNextInternal(batch.get()); + // Always add batch to the queue because it may contain data referenced by previously + // appended batches. + scan_node->AddMaterializedRowBatch(move(batch)); + RETURN_IF_ERROR(status); + } while (!eos_ && !scan_node_->ReachedLimit()); + return Status::OK(); +} + +void HdfsScanner::Close() { + DCHECK(scan_node_->HasRowBatchQueue()); + RowBatch* final_batch = new RowBatch(scan_node_->row_desc(), state_->batch_size(), + scan_node_->mem_tracker()); + Close(final_batch); +} + +void HdfsScanner::CloseInternal() { DCHECK(!is_closed_); - if (decompressor_.get() != NULL) { + if (decompressor_.get() != nullptr) { decompressor_->Close(); decompressor_.reset(); } @@ -153,7 +147,7 @@ void HdfsScanner::Close(RowBatch* row_batch) { } expr_mem_pool_->FreeAll(); obj_pool_.Clear(); - stream_ = NULL; + stream_ = nullptr; context_->ClearStreams(); is_closed_ = true; } @@ -179,26 +173,6 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, return Status::OK(); } -Status HdfsScanner::StartNewRowBatch() { - DCHECK(scan_node_->HasRowBatchQueue()); - batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), - scan_node_->mem_tracker()); - int64_t tuple_buffer_size; - RETURN_IF_ERROR( - batch_->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_)); - return Status::OK(); -} - -int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) { - DCHECK(scan_node_->HasRowBatchQueue()); - DCHECK(batch_ != NULL); - DCHECK_GT(batch_->capacity(), batch_->num_rows()); - *pool = batch_->tuple_data_pool(); - *tuple_mem = reinterpret_cast<Tuple*>(tuple_mem_); - *tuple_row_mem = batch_->GetRow(batch_->AddRow()); - return batch_->capacity() - batch_->num_rows(); -} - Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) { int num_tuples; @@ -210,10 +184,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool return Status::OK(); } -Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch) { - DCHECK(batch_ != NULL || !scan_node_->HasRowBatchQueue()); - DCHECK(batch_ == row_batch || !scan_node_->HasRowBatchQueue()); - DCHECK(!enqueue_if_full || scan_node_->HasRowBatchQueue()); +Status HdfsScanner::CommitRows(int num_rows, RowBatch* row_batch) { DCHECK_LE(num_rows, row_batch->capacity() - row_batch->num_rows()); row_batch->CommitRows(num_rows); tuple_mem_ += static_cast<int64_t>(scan_node_->tuple_desc()->byte_size()) * num_rows; @@ -224,10 +195,6 @@ Status HdfsScanner::CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row // if no rows passed predicates. if (row_batch->AtCapacity() || context_->num_completed_io_buffers() > 0) { context_->ReleaseCompletedResources(row_batch, /* done */ false); - if (enqueue_if_full) { - static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); - RETURN_IF_ERROR(StartNewRowBatch()); - } } if (context_->cancelled()) return Status::CANCELLED; // Check for UDF errors. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index cb9dfeb..f2f3407 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -22,33 +22,27 @@ #include <vector> #include <memory> #include <stdint.h> -#include <boost/regex.hpp> #include <boost/scoped_ptr.hpp> #include "codegen/impala-ir.h" #include "common/object-pool.h" +#include "common/status.h" #include "exec/hdfs-scan-node-base.h" -#include "exec/scan-node.h" #include "exec/scanner-context.h" -#include "runtime/disk-io-mgr.h" #include "runtime/row-batch.h" #include "runtime/tuple.h" namespace impala { +class Codec; class CollectionValueBuilder; class Compression; -class DescriptorTbl; class Expr; class HdfsPartitionDescriptor; class MemPool; -class SlotDescriptor; -class Status; class TextConverter; class TupleDescriptor; -class TPlanNode; -class TScanRange; -class Codec; +class SlotDescriptor; /// Intermediate structure used for two pass parsing approach. In the first pass, /// the FieldLocation structs are filled out and contain where all the fields start and @@ -118,7 +112,7 @@ class HdfsScanner { virtual ~HdfsScanner(); /// One-time initialisation of state that is constant across scan ranges. - virtual Status Open(ScannerContext* context); + virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT; /// Returns the next row batch from this scanner's split. /// Recoverable errors are logged to the runtime state. Only returns a non-OK status @@ -127,7 +121,7 @@ class HdfsScanner { /// The memory referenced by the tuples is valid until this or any subsequently /// returned batch is reset or destroyed. /// Only valid to call if the parent scan node is single-threaded. - Status GetNext(RowBatch* row_batch) { + Status GetNext(RowBatch* row_batch) WARN_UNUSED_RESULT { DCHECK(!scan_node_->HasRowBatchQueue()); return GetNextInternal(row_batch); } @@ -136,14 +130,26 @@ class HdfsScanner { /// initialized with the split data (e.g. template tuple, partition descriptor, etc). /// This function should only return on error or end of scan range. /// Only valid to call if the parent scan node is multi-threaded. - virtual Status ProcessSplit() = 0; + virtual Status ProcessSplit() WARN_UNUSED_RESULT; + + /// Creates a new row batch and transfers the ownership of memory backing returned + /// tuples to it by calling Close(RowBatch). That last batch is added to the row batch + /// queue. Only valid to call if HasRowBatchQueue(). + void Close(); /// Transfers the ownership of memory backing returned tuples such as IO buffers /// and memory in mem pools to the given row batch. If the row batch is NULL, /// those resources are released instead. In any case, releases all other resources /// that are not backing returned rows (e.g. temporary decompression buffers). /// This function is not idempotent and must only be called once. - virtual void Close(RowBatch* row_batch); + virtual void Close(RowBatch* row_batch) = 0; + + /// Helper function that frees resources common to all scanner subclasses like the + /// 'decompressor_', 'context_', 'obj_pool_', etc. Should only be called once the last + /// row batch has been attached to the row batch queue (if applicable) to avoid freeing + /// memory that might be referenced by the last batch. + /// Only valid to call if 'is_closed_' is false. Sets 'is_closed_' to true. + void CloseInternal(); /// Only valid to call if the parent scan node is single-threaded. bool eos() const { @@ -151,12 +157,6 @@ class HdfsScanner { return eos_; } - /// Only valid to call if the parent scan node is multi-threaded. - RowBatch* batch() const { - DCHECK(scan_node_->HasRowBatchQueue()); - return batch_; - } - /// Scanner subclasses must implement these static functions as well. Unfortunately, /// c++ does not allow static virtual functions. @@ -173,7 +173,7 @@ class HdfsScanner { /// should be issued to the io mgr. This is one range for the metadata and one /// range for each column, for each split. /// This function is how scanners can pick their strategy. - /// void IssueInitialRanges(HdfsScanNode* scan_node, + /// void IssueInitialRanges(HdfsScanNodeBase* scan_node, /// const std::vector<HdfsFileDesc*>& files); /// Codegen all functions for this scanner. The codegen'd function is specific to @@ -191,20 +191,19 @@ class HdfsScanner { RuntimeState* state_; /// Context for this scanner - ScannerContext* context_; + ScannerContext* context_ = nullptr; /// Object pool for objects with same lifetime as scanner. ObjectPool obj_pool_; /// The first stream for context_ - ScannerContext::Stream* stream_; + ScannerContext::Stream* stream_ = nullptr; /// Set if this scanner has processed all ranges and will not produce more rows. - /// Only relevant when calling the GetNext() interface. - bool eos_; + bool eos_ = false; /// Starts as false and is set to true in Close(). - bool is_closed_; + bool is_closed_ = false; /// MemPool used for expression evaluators in this scanner. Need to be local /// to each scanner as MemPool is not thread safe. @@ -217,7 +216,7 @@ class HdfsScanner { // Convenience reference to conjuncts_evals_map_[scan_node_->tuple_idx()] for // scanners that do not support nested types. - const std::vector<ScalarExprEvaluator*>* conjunct_evals_; + const std::vector<ScalarExprEvaluator*>* conjunct_evals_ = nullptr; // Clones of the conjuncts' evaluators in scan_node_->dict_filter_conjuncts_map(). typedef std::map<SlotId, std::vector<ScalarExprEvaluator*>> DictFilterConjunctsMap; @@ -242,23 +241,16 @@ class HdfsScanner { /// Convenience variable set to the top-level template tuple /// (i.e. template_tuple_map_[scan_node_->tuple_desc()]). - Tuple* template_tuple_; + Tuple* template_tuple_ = nullptr; /// Fixed size of each top-level tuple, in bytes const int32_t tuple_byte_size_; - /// Current tuple pointer into tuple_mem_. - Tuple* tuple_; + /// Current tuple pointer into 'tuple_mem_'. + Tuple* tuple_ = nullptr; - /// The current row batch being populated. Creating new row batches, attaching context - /// resources, and handing off to the scan node is handled by this class in CommitRows(), - /// but AttachPool() must be called by scanner subclasses to attach any memory allocated - /// by that subclass. All row batches created by this class are transferred to the scan - /// node (i.e., all batches are ultimately owned by the scan node). - RowBatch* batch_; - - /// The tuple memory of batch_. - uint8_t* tuple_mem_; + /// The tuple memory backing 'tuple_'. + uint8_t* tuple_mem_ = nullptr; /// Helper class for converting text to other types; boost::scoped_ptr<TextConverter> text_converter_; @@ -267,31 +259,31 @@ class HdfsScanner { /// This significantly minimizes the cross compile dependencies for llvm since status /// objects inline a bunch of string functions. Also, status objects aren't extremely /// cheap to create and destroy. - Status parse_status_; + Status parse_status_ = Status::OK(); /// Decompressor class to use, if any. boost::scoped_ptr<Codec> decompressor_; /// The most recently used decompression type. - THdfsCompression::type decompression_type_; + THdfsCompression::type decompression_type_ = THdfsCompression::NONE; /// Pool to allocate per data block memory. This should be used with the /// decompressor and any other per data block allocations. boost::scoped_ptr<MemPool> data_buffer_pool_; /// Time spent decompressing bytes. - RuntimeProfile::Counter* decompress_timer_; + RuntimeProfile::Counter* decompress_timer_ = nullptr; /// Matching typedef for WriteAlignedTuples for codegen. Refer to comments for /// that function. typedef int (*WriteTuplesFn)(HdfsScanner*, MemPool*, TupleRow*, int, FieldLocation*, int, int, int, int); /// Jitted write tuples function pointer. Null if codegen is disabled. - WriteTuplesFn write_tuples_fn_; + WriteTuplesFn write_tuples_fn_ = nullptr; /// Implements GetNext(). Should be overridden by subclasses. /// Only valid to call if the parent scan node is multi-threaded. - virtual Status GetNextInternal(RowBatch* row_batch) { + virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT { DCHECK(false) << "GetNextInternal() not implemented for this scanner type."; return Status::OK(); } @@ -301,25 +293,10 @@ class HdfsScanner { /// - type - type for this scanner /// - scanner_name - debug string name for this scanner (e.g. HdfsTextScanner) Status InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, - THdfsFileFormat::type type, const std::string& scanner_name); - - /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly. - /// Only valid to call if the parent scan node is multi-threaded. - Status StartNewRowBatch(); + THdfsFileFormat::type type, const std::string& scanner_name) WARN_UNUSED_RESULT; /// Reset internal state for a new scan range. - virtual Status InitNewRange() = 0; - - /// Gets memory for outputting tuples into batch_. - /// *pool is the mem pool that should be used for memory allocated for those tuples. - /// *tuple_mem should be the location to output tuples, and - /// *tuple_row_mem for outputting tuple rows. - /// Returns the maximum number of tuples/tuple rows that can be output (before the - /// current row batch is complete and a new one is allocated). - /// Memory returned from this call is invalidated after calling CommitRows(). - /// Callers must call GetMemory() again after calling this function. - /// Only valid to call if the parent scan node is multi-threaded. - int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem); + virtual Status InitNewRange() WARN_UNUSED_RESULT = 0; /// Gets memory for outputting tuples into the CollectionValue being constructed via /// 'builder'. If memory limit is exceeded, an error status is returned. Otherwise, @@ -330,40 +307,13 @@ class HdfsScanner { /// the next tuple. This also means its unnecessary to call /// (*tuple_row_mem)->SetTuple(). Status GetCollectionMemory(CollectionValueBuilder* builder, MemPool** pool, - Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows); + Tuple** tuple_mem, TupleRow** tuple_row_mem, int64_t* num_rows) WARN_UNUSED_RESULT; /// Commits 'num_rows' to 'row_batch'. Advances 'tuple_mem_' and 'tuple_' accordingly. /// Attaches completed resources from 'context_' to 'row_batch' if necessary. - /// Frees local expr allocations. - /// If 'enqueue_if_full' is true and 'row_batch' is at capacity after committing the - /// rows, then 'row_batch' is added to the queue, and a new batch is created with - /// StartNewRowBatch(). It is only valid to pass true for 'enqueue_if_full' if the - /// parent parent scan node is multi-threaded. - /// Returns non-OK if 'context_' is cancelled or the query status in 'state_' is - /// non-OK. - Status CommitRows(int num_rows, bool enqueue_if_full, RowBatch* row_batch); - - /// Calls the above CommitRows() passing true for 'queue_if_full', and 'batch_' as the - /// row batch. Only valid to call if the parent scan node is multi-threaded. - Status CommitRows(int num_rows) { - DCHECK(scan_node_->HasRowBatchQueue()); - return CommitRows(num_rows, true, batch_); - } - - /// Release all memory in 'pool' to batch_. If 'commit_batch' is true, the row batch - /// will be committed. 'commit_batch' should be true if the attached pool is expected - /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage. - /// Can return an error status if 'commit_batch' is true and allocating the next - /// batch fails, or if the query hit an error or is cancelled. Only valid to call if - /// the parent scan node is multi-threaded. - Status AttachPool(MemPool* pool, bool commit_batch) { - DCHECK(scan_node_->HasRowBatchQueue()); - DCHECK(batch_ != NULL); - DCHECK(pool != NULL); - batch_->tuple_data_pool()->AcquireData(pool, false); - if (commit_batch) RETURN_IF_ERROR(CommitRows(0)); - return Status::OK(); - } + /// Frees local expr allocations. Returns non-OK if 'context_' is cancelled or the + /// query status in 'state_' is non-OK. + Status CommitRows(int num_rows, RowBatch* row_batch) WARN_UNUSED_RESULT; /// Convenience function for evaluating conjuncts using this scanner's ScalarExprEvaluators. /// This must always be inlined so we can correctly replace the call to @@ -395,8 +345,8 @@ class HdfsScanner { /// Update the decompressor_ object given a compression type or codec name. Depending on /// the old compression type and the new one, it may close the old decompressor and/or /// create a new one of different type. - Status UpdateDecompressor(const THdfsCompression::type& compression); - Status UpdateDecompressor(const std::string& codec); + Status UpdateDecompressor(const THdfsCompression::type& compression) WARN_UNUSED_RESULT; + Status UpdateDecompressor(const std::string& codec) WARN_UNUSED_RESULT; /// Utility function to report parse errors for each field. /// If errors[i] is nonzero, fields[i] had a parse error. @@ -405,10 +355,10 @@ class HdfsScanner { /// This is called from WriteAlignedTuples. bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors); - /// Triggers debug action of the scan node. This is currently used by parquet column - /// readers to exercise various failure paths in parquet scanner. Returns the status + /// Triggers debug action of the scan node. This is currently used by Parquet column + /// readers to exercise various failure paths in Parquet scanner. Returns the status /// returned by the scan node's TriggerDebugAction(). - Status ScannerDebugAction() { + Status ScannerDebugAction() WARN_UNUSED_RESULT { return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER); } @@ -435,21 +385,23 @@ class HdfsScanner { /// TODO: revisit this bool WriteCompleteTuple(MemPool* pool, FieldLocation* fields, Tuple* tuple, TupleRow* tuple_row, Tuple* template_tuple, uint8_t* error_fields, - uint8_t* error_in_row); + uint8_t* error_in_row) WARN_UNUSED_RESULT; /// Codegen function to replace WriteCompleteTuple. Should behave identically /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn' /// if codegen was successful or NULL otherwise. static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen, const std::vector<ScalarExpr*>& conjuncts, - llvm::Function** write_complete_tuple_fn); + llvm::Function** write_complete_tuple_fn) + WARN_UNUSED_RESULT; /// Codegen function to replace WriteAlignedTuples. WriteAlignedTuples is cross /// compiled to IR. This function loads the precompiled IR function, modifies it, /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was /// successful or NULL otherwise. static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*, - llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn); + llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn) + WARN_UNUSED_RESULT; /// Report parse error for column @ desc. If abort_on_error is true, sets /// parse_status_ to the error message. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index 355a554..9b66432 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -42,10 +42,7 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node, - RuntimeState* state) - : BaseSequenceScanner(scan_node, state), - unparsed_data_buffer_(NULL), - num_buffered_records_in_compressed_block_(0) { + RuntimeState* state) : BaseSequenceScanner(scan_node, state) { } HdfsSequenceScanner::~HdfsSequenceScanner() { @@ -54,22 +51,22 @@ HdfsSequenceScanner::~HdfsSequenceScanner() { // Codegen for materialized parsed data into tuples. Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node, const vector<ScalarExpr*>& conjuncts, Function** write_aligned_tuples_fn) { - *write_aligned_tuples_fn = NULL; + *write_aligned_tuples_fn = nullptr; DCHECK(node->runtime_state()->ShouldCodegen()); LlvmCodeGen* codegen = node->runtime_state()->codegen(); - DCHECK(codegen != NULL); + DCHECK(codegen != nullptr); Function* write_complete_tuple_fn; RETURN_IF_ERROR(CodegenWriteCompleteTuple(node, codegen, conjuncts, &write_complete_tuple_fn)); - DCHECK(write_complete_tuple_fn != NULL); + DCHECK(write_complete_tuple_fn != nullptr); RETURN_IF_ERROR(CodegenWriteAlignedTuples(node, codegen, write_complete_tuple_fn, write_aligned_tuples_fn)); - DCHECK(*write_aligned_tuples_fn != NULL); + DCHECK(*write_aligned_tuples_fn != nullptr); return Status::OK(); } Status HdfsSequenceScanner::InitNewRange() { - DCHECK(header_ != NULL); + DCHECK(header_ != nullptr); only_parsing_header_ = false; HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor(); @@ -168,23 +165,34 @@ inline Status HdfsSequenceScanner::GetRecord(uint8_t** record_ptr, // 3. Read the sync indicator and check the sync block // This mimics the technique for text. // This function only returns on error or when the entire scan range is complete. -Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() { +Status HdfsSequenceScanner::ProcessBlockCompressedScanRange(RowBatch* row_batch) { DCHECK(header_->is_compressed); - while (!finished()) { - if (scan_node_->ReachedLimit()) return Status::OK(); - + if (num_buffered_records_in_compressed_block_ == 0) { + // We are reading a new compressed block. Pass the previous buffer pool bytes to the + // batch. We don't need them anymore. + if (!decompressor_->reuse_output_buffer()) { + row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); + RETURN_IF_ERROR(CommitRows(0, row_batch)); + if (row_batch->AtCapacity()) return Status::OK(); + } // Step 1 RETURN_IF_ERROR(ReadCompressedBlock()); if (num_buffered_records_in_compressed_block_ < 0) return parse_status_; + } - // Step 2 - while (num_buffered_records_in_compressed_block_ > 0) { - RETURN_IF_ERROR(ProcessDecompressedBlock()); - } + // Step 2 + while (num_buffered_records_in_compressed_block_ > 0) { + RETURN_IF_ERROR(ProcessDecompressedBlock(row_batch)); + if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break; + } - // SequenceFiles don't end with syncs - if (stream_->eof()) return Status::OK(); + if (num_buffered_records_in_compressed_block_ == 0) { + // SequenceFiles don't end with syncs. + if (stream_->eof()) { + eos_ = true; + return Status::OK(); + } // Step 3 int sync_indicator; @@ -193,8 +201,8 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() { if (state_->LogHasSpace()) { stringstream ss; ss << "Expecting sync indicator (-1) at file offset " - << (stream_->file_offset() - sizeof(int)) << ". " - << "Sync indicator found " << sync_indicator << "."; + << (stream_->file_offset() - sizeof(int)) << ". " + << "Sync indicator found " << sync_indicator << "."; state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str())); } return Status("Bad sync hash"); @@ -205,18 +213,17 @@ Status HdfsSequenceScanner::ProcessBlockCompressedScanRange() { return Status::OK(); } -Status HdfsSequenceScanner::ProcessDecompressedBlock() { - MemPool* pool; - TupleRow* tuple_row; - int64_t max_tuples = GetMemory(&pool, &tuple_, &tuple_row); +Status HdfsSequenceScanner::ProcessDecompressedBlock(RowBatch* row_batch) { + int64_t max_tuples = row_batch->capacity() - row_batch->num_rows(); int num_to_process = min(max_tuples, num_buffered_records_in_compressed_block_); num_buffered_records_in_compressed_block_ -= num_to_process; + TupleRow* tuple_row = row_batch->GetRow(row_batch->AddRow()); if (scan_node_->materialized_slots().empty()) { // Handle case where there are no slots to materialize (e.g. count(*)) num_to_process = WriteTemplateTuples(tuple_row, num_to_process); COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process); - RETURN_IF_ERROR(CommitRows(num_to_process)); + RETURN_IF_ERROR(CommitRows(num_to_process, row_batch)); return Status::OK(); } @@ -261,34 +268,32 @@ Status HdfsSequenceScanner::ProcessDecompressedBlock() { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); // Call jitted function if possible int tuples_returned; - if (write_tuples_fn_ != NULL) { + if (write_tuples_fn_ != nullptr) { // HdfsScanner::InitializeWriteTuplesFn() will skip codegen if there are string slots // and escape characters. TextConverter::WriteSlot() will be used instead. DCHECK(scan_node_->tuple_desc()->string_slots().empty() || delimited_text_parser_->escape_char() == '\0'); // last argument: seq always starts at record_location[0] - tuples_returned = write_tuples_fn_(this, pool, tuple_row, - batch_->row_byte_size(), field_locations_.data(), num_to_process, + tuples_returned = write_tuples_fn_(this, row_batch->tuple_data_pool(), tuple_row, + row_batch->row_byte_size(), field_locations_.data(), num_to_process, max_added_tuples, scan_node_->materialized_slots().size(), 0); } else { - tuples_returned = WriteAlignedTuples(pool, tuple_row, - batch_->row_byte_size(), field_locations_.data(), num_to_process, + tuples_returned = WriteAlignedTuples(row_batch->tuple_data_pool(), tuple_row, + row_batch->row_byte_size(), field_locations_.data(), num_to_process, max_added_tuples, scan_node_->materialized_slots().size(), 0); } if (tuples_returned == -1) return parse_status_; COUNTER_ADD(scan_node_->rows_read_counter(), num_to_process); - RETURN_IF_ERROR(CommitRows(tuples_returned)); + RETURN_IF_ERROR(CommitRows(tuples_returned, row_batch)); return Status::OK(); } -Status HdfsSequenceScanner::ProcessRange() { - num_buffered_records_in_compressed_block_ = 0; - +Status HdfsSequenceScanner::ProcessRange(RowBatch* row_batch) { SeqFileHeader* seq_header = reinterpret_cast<SeqFileHeader*>(header_); // Block compressed is handled separately to minimize function calls. if (seq_header->is_compressed && !seq_header->is_row_compressed) { - return ProcessBlockCompressedScanRange(); + return ProcessBlockCompressedScanRange(row_batch); } // We count the time here since there is too much overhead to do @@ -296,40 +301,30 @@ Status HdfsSequenceScanner::ProcessRange() { SCOPED_TIMER(scan_node_->materialize_tuple_timer()); int64_t num_rows_read = 0; - while (!finished()) { + const bool has_materialized_slots = !scan_node_->materialized_slots().empty(); + while (!eos_) { DCHECK_GT(record_locations_.size(), 0); - // Get the next compressed or uncompressed record. - RETURN_IF_ERROR( - GetRecord(&record_locations_[0].record, &record_locations_[0].len)); + TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow()); - MemPool* pool; - TupleRow* tuple_row_mem; - int max_tuples = GetMemory(&pool, &tuple_, &tuple_row_mem); - DCHECK_GT(max_tuples, 0); - - // Parse the current record. + // Get the next compressed or uncompressed record and parse it. + RETURN_IF_ERROR(GetRecord(&record_locations_[0].record, &record_locations_[0].len)); bool add_row = false; - - // Parse the current record. - if (scan_node_->materialized_slots().size() != 0) { + if (has_materialized_slots) { char* col_start; uint8_t* record_start = record_locations_[0].record; int num_tuples = 0; int num_fields = 0; char* row_end_loc; - uint8_t error_in_row = false; - RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations( 1, record_locations_[0].len, reinterpret_cast<char**>(&record_start), &row_end_loc, field_locations_.data(), &num_tuples, &num_fields, &col_start)); - DCHECK(num_tuples == 1); + DCHECK_EQ(num_tuples, 1); + uint8_t error_in_row = false; uint8_t errors[num_fields]; memset(errors, 0, num_fields); - - add_row = WriteCompleteTuple(pool, field_locations_.data(), tuple_, tuple_row_mem, - template_tuple_, &errors[0], &error_in_row); - + add_row = WriteCompleteTuple(row_batch->tuple_data_pool(), field_locations_.data(), + tuple_, tuple_row_mem, template_tuple_, &errors[0], &error_in_row); if (UNLIKELY(error_in_row)) { ReportTupleParseError(field_locations_.data(), errors); RETURN_IF_ERROR(parse_status_); @@ -338,11 +333,13 @@ Status HdfsSequenceScanner::ProcessRange() { add_row = WriteTemplateTuples(tuple_row_mem, 1); } num_rows_read++; - if (add_row) RETURN_IF_ERROR(CommitRows(1)); - if (scan_node_->ReachedLimit()) break; + if (add_row) RETURN_IF_ERROR(CommitRows(1, row_batch)); - // Sequence files don't end with syncs - if (stream_->eof()) break; + // Sequence files don't end with syncs. + if (stream_->eof()) { + eos_ = true; + break; + } // Check for sync by looking for the marker that precedes syncs. int marker; @@ -351,6 +348,10 @@ Status HdfsSequenceScanner::ProcessRange() { RETURN_IF_FALSE(stream_->ReadInt(&marker, &parse_status_, /* peek */ false)); RETURN_IF_ERROR(ReadSync()); } + + // These checks must come after advancing past the next sync such that the stream is + // at the start of the next data block when this function is called again. + if (row_batch->AtCapacity() || scan_node_->ReachedLimit()) break; } COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); @@ -454,19 +455,14 @@ Status HdfsSequenceScanner::ReadBlockHeader() { } Status HdfsSequenceScanner::ReadCompressedBlock() { - // We are reading a new compressed block. Pass the previous buffer pool - // bytes to the batch. We don't need them anymore. - if (!decompressor_->reuse_output_buffer()) { - RETURN_IF_ERROR(AttachPool(data_buffer_pool_.get(), true)); - } - + int64_t num_buffered_records; RETURN_IF_FALSE(stream_->ReadVLong( - &num_buffered_records_in_compressed_block_, &parse_status_)); - if (num_buffered_records_in_compressed_block_ < 0) { + &num_buffered_records, &parse_status_)); + if (num_buffered_records < 0) { if (state_->LogHasSpace()) { stringstream ss; ss << "Bad compressed block record count: " - << num_buffered_records_in_compressed_block_; + << num_buffered_records; state_->LogError(ErrorMsg(TErrorCode::GENERAL, ss.str())); } return Status("bad record count"); @@ -490,7 +486,7 @@ Status HdfsSequenceScanner::ReadCompressedBlock() { return Status(ss.str()); } - uint8_t* compressed_data = NULL; + uint8_t* compressed_data = nullptr; RETURN_IF_FALSE(stream_->ReadBytes(block_size, &compressed_data, &parse_status_)); { @@ -501,6 +497,6 @@ Status HdfsSequenceScanner::ReadCompressedBlock() { VLOG_FILE << "Decompressed " << block_size << " to " << len; next_record_in_compressed_block_ = unparsed_data_buffer_; } - + num_buffered_records_in_compressed_block_ = num_buffered_records; return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index 01d5a68..4845edb 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -19,30 +19,29 @@ #ifndef IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H #define IMPALA_EXEC_HDFS_SEQUENCE_SCANNER_H -/// This scanner parses Sequence file located in HDFS, and writes the -/// content as tuples in the Impala in-memory representation of data, e.g. -/// (tuples, rows, row batches). -// +/// This scanner parses Sequence files and writes the content as tuples in the Impala +/// in-memory representation of data (tuples, rows, row batches). +/// /// TODO: Make the various sequence file formats behave more similarly. They should /// all have a structure similar to block compressed operating in batches rather than /// row at a time. -// +/// /// org.apache.hadoop.io.SequenceFile is the original SequenceFile implementation /// and should be viewed as the canonical definition of this format. If /// anything is unclear in this file you should consult the code in /// org.apache.hadoop.io.SequenceFile. -// +/// /// The following is a pseudo-BNF grammar for SequenceFile. Comments are prefixed /// with dashes: -// +/// /// seqfile ::= /// <file-header> /// <record-block>+ -// +/// /// record-block ::= /// <record>+ /// <file-sync-hash> -// +/// /// file-header ::= /// <file-version-header> /// <file-key-class-name> @@ -52,68 +51,68 @@ /// [<file-compression-codec-class>] /// <file-header-metadata> /// <file-sync-field> -// +/// /// file-version-header ::= Byte[4] {'S', 'E', 'Q', 6} -// +/// /// -- The name of the Java class responsible for reading the key buffer -// +/// /// file-key-class-name ::= /// Text {"org.apache.hadoop.io.BytesWritable"} -// +/// /// -- The name of the Java class responsible for reading the value buffer -// +/// /// -- We don't care what this is. /// file-value-class-name ::= -// +/// /// -- Boolean variable indicating whether or not the file uses compression /// -- for key/values in this file -// +/// /// file-is-compressed ::= Byte[1] -// +/// /// -- A boolean field indicating whether or not the file is block compressed. -// +/// /// file-is-block-compressed ::= Byte[1] {false} -// +/// /// -- The Java class name of the compression codec iff <file-is-compressed> /// -- is true. The named class must implement /// -- org.apache.hadoop.io.compress.CompressionCodec. /// -- The expected value is org.apache.hadoop.io.compress.GzipCodec. -// +/// /// file-compression-codec-class ::= Text -// +/// /// -- A collection of key-value pairs defining metadata values for the /// -- file. The Map is serialized using standard JDK serialization, i.e. /// -- an Int corresponding to the number of key-value pairs, followed by /// -- Text key and value pairs. -// +/// /// file-header-metadata ::= Map<Text, Text> -// +/// /// -- A 16 byte marker that is generated by the writer. This marker appears /// -- at regular intervals at the beginning of records or record blocks /// -- intended to enable readers to skip to a random part of the file /// -- the sync hash is preceeded by a length of -1, refered to as the sync marker -// +/// /// file-sync-hash ::= Byte[16] -// +/// /// -- Records are all of one type as determined by the compression bits in the header -// +/// /// record ::= /// <uncompressed-record> | /// <block-compressed-record> | /// <record-compressed-record> -// +/// /// uncompressed-record ::= /// <record-length> /// <key-length> /// <key> /// <value> -// +/// /// record-compressed-record ::= /// <record-length> /// <key-length> /// <key> /// <compressed-value> -// +/// /// block-compressed-record ::= /// <file-sync-field> /// <key-lengths-block-size> @@ -124,30 +123,30 @@ /// <value-lengths-block> /// <values-block-size> /// <values-block> -// +/// /// record-length := Int /// key-length := Int /// keys-lengths-block-size> := Int /// value-lengths-block-size> := Int -// +/// /// keys-block :: = Byte[keys-block-size] /// values-block :: = Byte[values-block-size] -// +/// /// -- The key-lengths and value-lengths blocks are are a sequence of lengths encoded /// -- in ZeroCompressedInteger (VInt) format. -// +/// /// key-lengths-block :: = Byte[key-lengths-block-size] /// value-lengths-block :: = Byte[value-lengths-block-size] -// +/// /// Byte ::= An eight-bit byte -// +/// /// VInt ::= Variable length integer. The high-order bit of each byte /// indicates whether more bytes remain to be read. The low-order seven /// bits are appended as increasingly more significant bits in the /// resulting integer value. -// +/// /// Int ::= A four-byte integer in big-endian format. -// +/// /// Text ::= VInt, Chars (Length prefixed UTF-8 characters) #include "exec/base-sequence-scanner.h" @@ -167,20 +166,21 @@ class HdfsSequenceScanner : public BaseSequenceScanner { virtual ~HdfsSequenceScanner(); /// Implementation of HdfsScanner interface. - virtual Status Open(ScannerContext* context); + virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT; /// Codegen WriteAlignedTuples(). Stores the resulting function in - /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. + /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise. static Status Codegen(HdfsScanNodeBase* node, const std::vector<ScalarExpr*>& conjuncts, - llvm::Function** write_aligned_tuples_fn); + llvm::Function** write_aligned_tuples_fn) + WARN_UNUSED_RESULT; protected: /// Implementation of sequence container super class methods. virtual FileHeader* AllocateFileHeader(); - virtual Status ReadFileHeader(); - virtual Status InitNewRange(); - virtual Status ProcessRange(); + virtual Status ReadFileHeader() WARN_UNUSED_RESULT; + virtual Status InitNewRange() WARN_UNUSED_RESULT; + virtual Status ProcessRange(RowBatch* row_batch) WARN_UNUSED_RESULT; virtual THdfsFileFormat::type file_format() const { return THdfsFileFormat::SEQUENCE_FILE; @@ -189,35 +189,37 @@ class HdfsSequenceScanner : public BaseSequenceScanner { private: /// Maximum size of a compressed block. This is used to check for corrupted /// block size so we do not read the whole file before we detect the error. - const static int MAX_BLOCK_SIZE = (1024 * 1024 * 1024); + static const int64_t MAX_BLOCK_SIZE = 1024 * 1024 * 1024; /// The value class name located in the SeqFile Header. /// This is always "org.apache.hadoop.io.Text" static const char* const SEQFILE_VALUE_CLASS_NAME; - /// Read the record header. - /// Sets: - /// current_block_length_ - Status ReadBlockHeader(); + /// Reads the record header and sets 'current_block_length_'. + Status ReadBlockHeader() WARN_UNUSED_RESULT; - /// Process an entire block compressed scan range. Block compressed ranges are - /// more common and can be parsed more efficiently in larger pieces. - Status ProcessBlockCompressedScanRange(); + /// Processes or continues processing a block-compressed scan range, adding tuples + /// to 'row_batch'. Block-compressed ranges are common and can be parsed more + /// efficiently in larger pieces. + Status ProcessBlockCompressedScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT; - /// Read a compressed block. Does NOT read sync or -1 marker preceding sync. - /// Decompress to unparsed_data_buffer_ allocated from unparsed_data_buffer_pool_. - Status ReadCompressedBlock(); + /// Reads a compressed block. Does NOT read sync or -1 marker preceding sync. + /// Decompresses the data into 'unparsed_data_buffer_' allocated from the + /// 'data_buffer_pool_' via the decompressor. + /// Sets 'num_buffered_records_in_compressed_block_' if decompression was + /// successful. + Status ReadCompressedBlock() WARN_UNUSED_RESULT; - /// Utility function for parsing next_record_in_compressed_block_. Called by - /// ProcessBlockCompressedScanRange. - Status ProcessDecompressedBlock(); + /// Utility function for parsing 'next_record_in_compressed_block_'. Called by + /// ProcessBlockCompressedScanRange(). + Status ProcessDecompressedBlock(RowBatch* row_batch) WARN_UNUSED_RESULT; - /// Read compressed or uncompressed records from the byte stream into memory - /// in unparsed_data_buffer_pool_. Not used for block compressed files. + /// Read a single record from the current position in 'stream_', decompressing + /// the record, if necessary. Not used for block compressed files. /// Output: - /// record_ptr: ponter to the record. + /// record_ptr: pointer to the record /// record_len: length of the record - Status GetRecord(uint8_t** record_ptr, int64_t *record_len); + Status GetRecord(uint8_t** record_ptr, int64_t* record_len) WARN_UNUSED_RESULT; /// Helper class for picking fields and rows from delimited text. boost::scoped_ptr<DelimitedTextParser> delimited_text_parser_; @@ -235,25 +237,24 @@ class HdfsSequenceScanner : public BaseSequenceScanner { int64_t len; }; - /// Records are processed in batches. This vector stores batches of record locations + /// Records are processed in batches. This vector stores batches of record locations /// that are being processed. - /// TODO: better perf not to use vector? std::vector<RecordLocation> record_locations_; /// Length of the current sequence file block (or record). - int current_block_length_; + int current_block_length_ = -1; /// Length of the current key. This is specified as 4 bytes in the format description. - int current_key_length_; + int current_key_length_ = -1; - /// Buffer for data read from HDFS or from decompressing the HDFS data. - uint8_t* unparsed_data_buffer_; + /// Buffer for data read from the 'stream_' directly or after decompression. + uint8_t* unparsed_data_buffer_ = nullptr; /// Number of buffered records unparsed_data_buffer_ from block compressed data. - int64_t num_buffered_records_in_compressed_block_; + int64_t num_buffered_records_in_compressed_block_ = 0; /// Next record from block compressed data. - uint8_t* next_record_in_compressed_block_; + uint8_t* next_record_in_compressed_block_ = nullptr; }; } // namespace impala http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index c1a12d6..378b45e 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -17,6 +17,8 @@ #include "exec/hdfs-text-scanner.h" +#include <memory> + #include "codegen/llvm-codegen.h" #include "exec/delimited-text-parser.h" #include "exec/delimited-text-parser.inline.h" @@ -152,22 +154,6 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, return Status::OK(); } -Status HdfsTextScanner::ProcessSplit() { - DCHECK(scan_node_->HasRowBatchQueue()); - HdfsScanNode* scan_node = static_cast<HdfsScanNode*>(scan_node_); - do { - batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), - scan_node_->mem_tracker()); - RETURN_IF_ERROR(GetNextInternal(batch_)); - scan_node->AddMaterializedRowBatch(batch_); - } while (!eos_ && !scan_node_->ReachedLimit()); - - // Transfer the remaining resources to this new batch in Close(). - batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), - scan_node_->mem_tracker()); - return Status::OK(); -} - void HdfsTextScanner::Close(RowBatch* row_batch) { DCHECK(!is_closed_); // Need to close the decompressor before transferring the remaining resources to @@ -183,7 +169,8 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false); context_->ReleaseCompletedResources(row_batch, true); if (scan_node_->HasRowBatchQueue()) { - static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); + static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch( + unique_ptr<RowBatch>(row_batch)); } } else { if (template_tuple_pool_ != nullptr) template_tuple_pool_->FreeAll(); @@ -201,7 +188,7 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { scan_node_->RangeComplete(THdfsFileFormat::TEXT, stream_->file_desc()->file_compression); } - HdfsScanner::Close(row_batch); + CloseInternal(); } Status HdfsTextScanner::InitNewRange() { @@ -321,14 +308,14 @@ Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) { DCHECK_LE(num_tuples, 1); DCHECK_GE(num_tuples, 0); COUNTER_ADD(scan_node_->rows_read_counter(), num_tuples); - RETURN_IF_ERROR(CommitRows(num_tuples, false, row_batch)); + RETURN_IF_ERROR(CommitRows(num_tuples, row_batch)); } else if (delimited_text_parser_->HasUnfinishedTuple()) { DCHECK(scan_node_->materialized_slots().empty()); DCHECK_EQ(scan_node_->num_materialized_partition_keys(), 0); // If no fields are materialized we do not update partial_tuple_empty_, // boundary_column_, or boundary_row_. However, we still need to handle the case // of partial tuple due to missing tuple delimiter at the end of file. - RETURN_IF_ERROR(CommitRows(1, false, row_batch)); + RETURN_IF_ERROR(CommitRows(1, row_batch)); } break; } @@ -417,7 +404,7 @@ Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) { } RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row)); } - RETURN_IF_ERROR(CommitRows(num_tuples_materialized, false, row_batch)); + RETURN_IF_ERROR(CommitRows(num_tuples_materialized, row_batch)); // Already past the scan range and attempting to complete the last row. if (scan_state_ == PAST_SCAN_RANGE) break; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index af8fa8a..ceeda8d 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -51,19 +51,18 @@ class HdfsTextScanner : public HdfsScanner { virtual ~HdfsTextScanner(); /// Implementation of HdfsScanner interface. - virtual Status Open(ScannerContext* context); - virtual Status ProcessSplit(); + virtual Status Open(ScannerContext* context) WARN_UNUSED_RESULT; virtual void Close(RowBatch* row_batch); /// Issue io manager byte ranges for 'files'. static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, - const std::vector<HdfsFileDesc*>& files); + const std::vector<HdfsFileDesc*>& files) WARN_UNUSED_RESULT; /// Codegen WriteAlignedTuples(). Stores the resulting function in /// 'write_aligned_tuples_fn' if codegen was successful or nullptr otherwise. static Status Codegen(HdfsScanNodeBase* node, const std::vector<ScalarExpr*>& conjuncts, - llvm::Function** write_aligned_tuples_fn); + llvm::Function** write_aligned_tuples_fn) WARN_UNUSED_RESULT; /// Suffix for lzo index files. const static std::string LZO_INDEX_SUFFIX; @@ -71,11 +70,11 @@ class HdfsTextScanner : public HdfsScanner { static const char* LLVM_CLASS_NAME; protected: - virtual Status GetNextInternal(RowBatch* row_batch); + virtual Status GetNextInternal(RowBatch* row_batch) WARN_UNUSED_RESULT; /// Reset the scanner. This clears any partial state that needs to /// be cleared when starting or when restarting after an error. - Status ResetScanner(); + Status ResetScanner() WARN_UNUSED_RESULT; /// Current position in byte buffer. char* byte_buffer_ptr_; @@ -103,14 +102,14 @@ class HdfsTextScanner : public HdfsScanner { /// Initializes this scanner for this context. The context maps to a single /// scan range. Advances the scan state to SCAN_RANGE_INITIALIZED. - virtual Status InitNewRange(); + virtual Status InitNewRange() WARN_UNUSED_RESULT; /// Finds the start of the first tuple in this scan range and initializes /// 'byte_buffer_ptr_' to point to the start of first tuple. Advances the scan state /// to FIRST_TUPLE_FOUND, if successful. Otherwise, consumes the whole scan range /// and does not update the scan state (e.g. if there are really large columns). /// Only valid to call in scan state SCAN_RANGE_INITIALIZED. - Status FindFirstTuple(MemPool* pool); + Status FindFirstTuple(MemPool* pool) WARN_UNUSED_RESULT; /// When in scan state FIRST_TUPLE_FOUND, starts or continues processing the scan range /// by reading bytes from 'context_'. Adds materialized tuples that pass the conjuncts @@ -122,11 +121,11 @@ class HdfsTextScanner : public HdfsScanner { /// Advances the scan state to PAST_SCAN_RANGE if all bytes in the scan range have been /// processed. /// Only valid to call in scan state FIRST_TUPLE_FOUND or PAST_SCAN_RANGE. - Status ProcessRange(RowBatch* row_batch, int* num_tuples); + Status ProcessRange(RowBatch* row_batch, int* num_tuples) WARN_UNUSED_RESULT; /// Reads past the end of the scan range for the next tuple end. If successful, /// advances the scan state to DONE. Only valid to call in state PAST_SCAN_RANGE. - Status FinishScanRange(RowBatch* row_batch); + Status FinishScanRange(RowBatch* row_batch) WARN_UNUSED_RESULT; /// Fills the next byte buffer from the context. This will block if there are no bytes /// ready. Updates byte_buffer_ptr_, byte_buffer_end_ and byte_buffer_read_size_. @@ -137,11 +136,12 @@ class HdfsTextScanner : public HdfsScanner { /// If applicable, attaches decompression buffers from previous calls that might still /// be referenced by returned batches to 'pool'. If 'pool' is nullptr the buffers are /// freed instead. - virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0); + virtual Status FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes = 0) + WARN_UNUSED_RESULT; /// Fills the next byte buffer from the compressed data in stream_ by reading the entire /// file, decompressing it, and setting the byte_buffer_ptr_ to the decompressed buffer. - Status FillByteBufferCompressedFile(bool* eosr); + Status FillByteBufferCompressedFile(bool* eosr) WARN_UNUSED_RESULT; /// Fills the next byte buffer from the compressed data in stream_. Unlike /// FillByteBufferCompressedFile(), the entire file does not need to be read at once. @@ -149,21 +149,21 @@ class HdfsTextScanner : public HdfsScanner { /// to available decompressed data. /// Attaches decompression buffers from previous calls that might still be referenced /// by returned batches to 'pool'. If 'pool' is nullptr the buffers are freed instead. - Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr); + Status FillByteBufferCompressedStream(MemPool* pool, bool* eosr) WARN_UNUSED_RESULT; /// Used by FillByteBufferCompressedStream() to decompress data from 'stream_'. /// Returns COMPRESSED_FILE_DECOMPRESSOR_NO_PROGRESS if it needs more input. /// If bytes_to_read > 0, will read specified size. /// If bytes_to_read = -1, will call GetBuffer(). Status DecompressBufferStream(int64_t bytes_to_read, uint8_t** decompressed_buffer, - int64_t* decompressed_len, bool *eosr); + int64_t* decompressed_len, bool *eosr) WARN_UNUSED_RESULT; /// Checks if the current buffer ends with a row delimiter spanning this and the next /// buffer (i.e. a "\r\n" delimiter). Does not modify byte_buffer_ptr_, etc. Always /// returns false if the table's row delimiter is not '\n'. This can only be called /// after the buffer has been fully parsed, i.e. when byte_buffer_ptr_ == /// byte_buffer_end_. - Status CheckForSplitDelimiter(bool* split_delimiter); + Status CheckForSplitDelimiter(bool* split_delimiter) WARN_UNUSED_RESULT; /// Prepends field data that was from the previous file buffer (This field straddled two /// file buffers). 'data' already contains the pointer/len from the current file buffer, @@ -171,7 +171,7 @@ class HdfsTextScanner : public HdfsScanner { /// This function will allocate a new string from the tuple pool, concatenate the /// two pieces and update 'data' to contain the new pointer/len. Return error status if /// memory limit is exceeded when allocating a new string. - Status CopyBoundaryField(FieldLocation* data, MemPool* pool); + Status CopyBoundaryField(FieldLocation* data, MemPool* pool) WARN_UNUSED_RESULT; /// Writes intermediate parsed data into 'tuple_', evaluates conjuncts, and appends /// surviving rows to 'row'. Advances 'tuple_' and 'row' as necessary. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index 08a3339..0845d9a 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -97,9 +97,9 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos } *eos = false; - RowBatch* materialized_batch = materialized_row_batches_->GetBatch(); + unique_ptr<RowBatch> materialized_batch = materialized_row_batches_->GetBatch(); if (materialized_batch != NULL) { - row_batch->AcquireState(materialized_batch); + row_batch->AcquireState(materialized_batch.get()); num_rows_returned_ += row_batch->num_rows(); COUNTER_SET(rows_returned_counter_, num_rows_returned_); @@ -114,7 +114,7 @@ Status KuduScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos done_ = true; materialized_row_batches_->Shutdown(); } - delete materialized_batch; + materialized_batch.reset(); } else { *eos = true; } @@ -172,15 +172,16 @@ Status KuduScanNode::ProcessScanToken(KuduScanner* scanner, const string& scan_t RETURN_IF_ERROR(scanner->OpenNextScanToken(scan_token)); bool eos = false; while (!eos && !done_) { - gscoped_ptr<RowBatch> row_batch(new RowBatch( - row_desc(), runtime_state_->batch_size(), mem_tracker())); + unique_ptr<RowBatch> row_batch = std::make_unique<RowBatch>(row_desc(), + runtime_state_->batch_size(), mem_tracker()); RETURN_IF_ERROR(scanner->GetNext(row_batch.get(), &eos)); while (!done_) { scanner->KeepKuduScannerAlive(); - if (materialized_row_batches_->AddBatchWithTimeout(row_batch.get(), 1000000)) { - ignore_result(row_batch.release()); + if (materialized_row_batches_->BlockingPutWithTimeout(move(row_batch), 1000000)) { break; } + // Make sure that we still own the RowBatch if BlockingPutWithTimeout() timed out. + DCHECK(row_batch != nullptr); } } if (eos) scan_ranges_complete_counter()->Add(1); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/exec/scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h index 69fd53d..87b9c71 100644 --- a/be/src/exec/scan-node.h +++ b/be/src/exec/scan-node.h @@ -85,7 +85,7 @@ class ScanNode : public ExecNode { active_scanner_thread_counter_(TUnit::UNIT, 0), active_hdfs_read_thread_counter_(TUnit::UNIT, 0) {} - virtual Status Prepare(RuntimeState* state); + virtual Status Prepare(RuntimeState* state) WARN_UNUSED_RESULT; /// This should be called before Prepare(), and the argument must be not destroyed until /// after Prepare(). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/be/src/util/blocking-queue.h ---------------------------------------------------------------------- diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h index 5d88397..a32345c 100644 --- a/be/src/util/blocking-queue.h +++ b/be/src/util/blocking-queue.h @@ -23,6 +23,7 @@ #include <boost/thread/mutex.hpp> #include <boost/scoped_ptr.hpp> #include <deque> +#include <memory> #include <unistd.h> #include "common/atomic.h" @@ -92,7 +93,7 @@ class BlockingQueue : public CacheLineAligned { } DCHECK(!get_list_.empty()); - *out = get_list_.front(); + *out = std::move(get_list_.front()); get_list_.pop_front(); get_list_size_.Store(get_list_.size()); read_lock.unlock(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/931bf49c/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test index e396583..3f62b2e 100644 --- a/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test +++ b/testdata/workloads/functional-query/queries/DataErrorsTest/avro-errors.test @@ -10,8 +10,7 @@ string row_regex: .*Problem parsing file $NAMENODE/.* File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/truncated_string.avro' is corrupt: truncated data block at offset 155 File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/negative_string_len.avro' is corrupt: invalid length -7 at offset 164 -File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174 -File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid encoded integer at offset 191 +File '$NAMENODE/test-warehouse/bad_avro_snap_strings_avro_snap/invalid_union.avro' is corrupt: invalid union value 4 at offset 174 (1 of 2 similar) ==== ---- QUERY # Read from the corrupt files. We may get partial results.
