http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 36e95b5..2efb24e 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -19,83 +19,29 @@ #ifndef IMPALA_EXEC_HDFS_SCAN_NODE_H_ #define IMPALA_EXEC_HDFS_SCAN_NODE_H_ -#include <vector> -#include <memory> +#include <map> #include <stdint.h> +#include <vector> -#include <boost/unordered_map.hpp> -#include <boost/unordered_set.hpp> #include <boost/scoped_ptr.hpp> -#include <boost/thread/condition_variable.hpp> #include <boost/thread/mutex.hpp> #include "exec/filter-context.h" -#include "exec/scan-node.h" -#include "exec/scanner-context.h" -#include "runtime/descriptors.h" +#include "exec/hdfs-scan-node-base.h" #include "runtime/disk-io-mgr.h" -#include "util/avro-util.h" #include "util/counting-barrier.h" -#include "util/progress-updater.h" -#include "util/spinlock.h" #include "util/thread.h" -#include "gen-cpp/PlanNodes_types.h" - namespace impala { class DescriptorTbl; -class HdfsScanner; +class ObjectPool; +class RuntimeState; class RowBatch; -class RuntimeFilter; -class Status; -class Tuple; class TPlanNode; -class TScanRange; - -/// Maintains per file information for files assigned to this scan node. This includes -/// all the splits for the file. Note that it is not thread-safe. -struct HdfsFileDesc { - /// Connection to the filesystem containing the file. - hdfsFS fs; - - /// File name including the path. - std::string filename; - - /// Length of the file. This is not related to which parts of the file have been - /// assigned to this node. - int64_t file_length; - - /// Last modified time - int64_t mtime; - - THdfsCompression::type file_compression; - - /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node. - std::vector<DiskIoMgr::ScanRange*> splits; - HdfsFileDesc(const std::string& filename) - : filename(filename), file_length(0), mtime(0), - file_compression(THdfsCompression::NONE) { - } -}; - -/// Struct for additional metadata for scan ranges. This contains the partition id -/// that this scan range is for. -struct ScanRangeMetadata { - /// The partition id that this range is part of. - int64_t partition_id; - - /// For parquet scan ranges we initially create a request for the file footer for each - /// split; we store a pointer to the actual split so that we can recover its information - /// for the scanner to process. - const DiskIoMgr::ScanRange* original_split; - - ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split) - : partition_id(partition_id), original_split(original_split) { } -}; -/// A ScanNode implementation that is used for all tables read directly from -/// HDFS-serialised data. +/// Legacy ScanNode implementation used in the non-multi-threaded execution mode +/// that is used for all tables read directly from HDFS-serialised data. /// A HdfsScanNode spawns multiple scanner threads to process the bytes in /// parallel. There is a handshake between the scan node and the scanners /// to get all the splits queued and bytes processed. @@ -111,147 +57,36 @@ struct ScanRangeMetadata { /// 5. The scanner finishes the scan range and informs the scan node so it can track /// end of stream. /// -/// An HdfsScanNode may expect to receive runtime filters produced elsewhere in the plan -/// (even from remote fragments). These filters arrive asynchronously during execution, -/// and are applied as soon as they arrive. Filters may be applied by the scan node in the -/// following scopes: -/// -/// 1. Per-file (all file formats, partition column filters only) - filtering at this -/// scope saves IO as the filters are applied before scan ranges are issued. -/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at -/// this scope saves CPU as filtered scan ranges are never scanned. -/// -/// Scanners may also use the same filters to eliminate rows at finer granularities -/// (e.g. per row). -/// -/// TODO: this class allocates a bunch of small utility objects that should be +/// TODO: This class allocates a bunch of small utility objects that should be /// recycled. -class HdfsScanNode : public ScanNode { +/// TODO: Remove this class once the fragment-based multi-threaded execution is +/// fully functional. +class HdfsScanNode : public HdfsScanNodeBase { public: HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); - ~HdfsScanNode(); - /// ExecNode methods - virtual Status Init(const TPlanNode& tnode, RuntimeState* state); virtual Status Prepare(RuntimeState* state); virtual Status Open(RuntimeState* state); virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); - virtual Status Reset(RuntimeState* state); virtual void Close(RuntimeState* state); - int limit() const { return limit_; } - - const std::vector<SlotDescriptor*>& materialized_slots() - const { return materialized_slots_; } - - /// Returns the tuple idx into the row for this scan node to output to. - /// Currently this is always 0. - int tuple_idx() const { return 0; } - - /// Returns number of partition keys in the table. - int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); } - - /// Returns number of partition key slots. - int num_materialized_partition_keys() const { return partition_key_slots_.size(); } - - const TupleDescriptor* tuple_desc() const { return tuple_desc_; } - - const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; } - - const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); } - - RuntimeState* runtime_state() { return runtime_state_; } + virtual bool HasRowBatchQueue() const { return true; } - int skip_header_line_count() const { return skip_header_line_count_; } + bool done() const { return done_; } - DiskIoRequestContext* reader_context() { return reader_context_; } - - typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap; - const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; } - - RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() { - return max_compressed_text_file_length_; - } - - const static int SKIP_COLUMN = -1; - - /// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if - /// that path is not materialized. - int GetMaterializedSlotIdx(const std::vector<int>& path) const { - PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path); - if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN; - return result->second; - } - - /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff - /// column i should be materialized. - const bool* is_materialized_col() { - return reinterpret_cast<const bool*>(&is_materialized_col_[0]); - } - - /// Returns the per format codegen'd function. Scanners call this to get the - /// codegen'd function to use. Returns NULL if codegen should not be used. - void* GetCodegenFn(THdfsFileFormat::type); - - inline void IncNumScannersCodegenEnabled() { - num_scanners_codegen_enabled_.Add(1); - } - - inline void IncNumScannersCodegenDisabled() { - num_scanners_codegen_disabled_.Add(1); - } + /// Adds ranges to the io mgr queue and starts up new scanner threads if possible. + virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, + int num_files_queued); /// Adds a materialized row batch for the scan node. This is called from scanner /// threads. /// This function will block if materialized_row_batches_ is full. void AddMaterializedRowBatch(RowBatch* row_batch); - /// Allocate a new scan range object, stored in the runtime state's object pool. For - /// scan ranges that correspond to the original hdfs splits, the partition id must be - /// set to the range's partition id. For other ranges (e.g. columns in parquet, read - /// past buffers), the partition_id is unused. expected_local should be true if this - /// scan range is not expected to require a remote read. The range must fall within - /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length. - /// If not NULL, the 'original_split' pointer is stored for reference in the scan range - /// metadata of the scan range that is to be allocated. + /// Sets the scanner specific metadata for 'filename'. /// This is thread safe. - DiskIoMgr::ScanRange* AllocateScanRange( - hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, - int disk_id, bool try_cache, bool expected_local, int64_t mtime, - const DiskIoMgr::ScanRange* original_split = NULL); - - /// Adds ranges to the io mgr queue and starts up new scanner threads if possible. - /// 'num_files_queued' indicates how many file's scan ranges have been added - /// completely. A file's scan ranges are added completely if no new scanner threads - /// will be needed to process that file besides the additional threads needed to - /// process those in 'ranges'. - Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, - int num_files_queued); - - /// Adds all splits for file_desc to the io mgr queue and indicates one file has - /// been added completely. - inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) { - return AddDiskIoRanges(file_desc->splits, 1); - } - - /// Allocates and initialises template_tuple_ with any values from the partition columns - /// for the current scan range - /// Returns NULL if there are no partition keys slots. - /// TODO: cache the tuple template in the partition object. - Tuple* InitTemplateTuple(RuntimeState* state, - const std::vector<ExprContext*>& value_ctxs); - - /// Allocates and return an empty template tuple (i.e. with no values filled in). - /// Scanners can use this method to initialize a template tuple even if there are no - /// partition keys slots (e.g. to hold Avro default values). - Tuple* InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc); - - /// Acquires all allocations from pool into scan_node_pool_. Thread-safe. - void TransferToScanNodePool(MemPool* pool); - - /// Returns the file desc for 'filename'. Returns NULL if filename is invalid. - HdfsFileDesc* GetFileDesc(const std::string& filename); + void SetFileMetadata(const std::string& filename, void* metadata); /// Gets scanner specific metadata for 'filename'. Scanners can use this to store /// file header information. @@ -259,117 +94,18 @@ class HdfsScanNode : public ScanNode { /// This is thread safe. void* GetFileMetadata(const std::string& filename); - /// Sets the scanner specific metadata for 'filename'. - /// This is thread safe. - void SetFileMetadata(const std::string& filename, void* metadata); - - /// Called by the scanner when a range is complete. Used to trigger done_ and - /// to log progress. This *must* only be called after the scanner has completely - /// finished the scan range (i.e. context->Flush()), and has added the final - /// row batch to the row batch queue. Otherwise the last batch may be - /// lost due to racing with shutting down the row batch queue. - void RangeComplete(const THdfsFileFormat::type& file_type, - const THdfsCompression::type& compression_type); - /// Same as above except for when multiple compression codecs were used - /// in the file. The metrics are incremented for each compression_type. - void RangeComplete(const THdfsFileFormat::type& file_type, + /// Called by scanners when a range is complete. Used to record progress and set done_. + /// This *must* only be called after a scanner has completely finished its + /// scan range (i.e. context->Flush()), and has added the final row batch to the row + /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down + /// the RowBatch queue. + virtual void RangeComplete(const THdfsFileFormat::type& file_type, const std::vector<THdfsCompression::type>& compression_type); - /// Utility function to compute the order in which to materialize slots to allow for - /// computing conjuncts as slots get materialized (on partial tuples). - /// 'order' will contain for each slot, the first conjunct it is associated with. - /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before - /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have - /// order set to conjuncts.size() - void ComputeSlotMaterializationOrder(std::vector<int>* order) const; - - /// Returns true if there are no materialized slots, such as a count(*) over the table. - inline bool IsZeroSlotTableScan() const { - return materialized_slots().empty() && tuple_desc()->tuple_path().empty(); - } - - /// map from volume id to <number of split, per volume split lengths> - typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats; - - /// Update the per volume stats with the given scan range params list - static void UpdateHdfsSplitStats( - const std::vector<TScanRangeParams>& scan_range_params_list, - PerVolumnStats* per_volume_stats); - - /// Output the per_volume_stats to stringstream. The output format is a list of: - /// <volume id>:<# splits>/<per volume split lengths> - static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats, - std::stringstream* ss); - - /// Description string for the per volume stats output. - static const std::string HDFS_SPLIT_STATS_DESC; - - /// Returns true if partition 'partition_id' passes all the filter predicates in - /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the - /// counter groups in FilterStats, and is used to update the correct statistics. - /// - /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the - /// function returns true, or a set of filter contexts to evaluate. - bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name, - const std::vector<FilterContext>& filter_ctxs); - - const std::vector<FilterContext> filter_ctxs() const { return filter_ctxs_; } - - protected: - friend class ScannerContext; - friend class HdfsScanner; - - RuntimeState* runtime_state_; - - // Number of header lines to skip at the beginning of each file of this table. Only set - // to values > 0 for hdfs text files. - const int skip_header_line_count_; - - /// Tuple id resolved in Prepare() to set tuple_desc_ - const int tuple_id_; - - /// RequestContext object to use with the disk-io-mgr for reads. - DiskIoRequestContext* reader_context_; - - /// Descriptor for tuples this scan node constructs - const TupleDescriptor* tuple_desc_; - - /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only - /// the partition columns for that partition materialized. Used to filter files and scan - /// ranges on partition-column filters. Populated in Prepare(). - boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_; - - /// Descriptor for the hdfs table, including partition and format metadata. - /// Set in Prepare, owned by RuntimeState - const HdfsTableDescriptor* hdfs_table_; - - /// The root of the table's Avro schema, if we're scanning an Avro table. - ScopedAvroSchemaElement avro_schema_; - - /// If true, the warning that some disk ids are unknown was logged. Only log this once - /// per scan node since it can be noisy. - bool unknown_disk_id_warned_; - - /// Partitions scanned by this scan node. - boost::unordered_set<int64_t> partition_ids_; - - /// File path => file descriptor (which includes the file's splits) - typedef std::map<std::string, HdfsFileDesc*> FileDescMap; - FileDescMap file_descs_; - - /// File format => file descriptors. - typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap; - FileFormatsMap per_type_files_; - - /// Conjuncts for each materialized tuple (top-level row batch tuples and collection - /// item tuples). Includes a copy of ExecNode.conjuncts_. - ConjunctsMap conjuncts_map_; - - /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on - /// the first call to GetNext(). The token manager, in a different thread, will read - /// this variable. - bool initial_ranges_issued_; + /// Acquires all allocations from pool into scan_node_pool_. Thread-safe. + void TransferToScanNodePool(MemPool* pool); + private: /// Released when initial ranges are issued in the first call to GetNext(). CountingBarrier ranges_issued_barrier_; @@ -378,44 +114,6 @@ class HdfsScanNode : public ScanNode { /// scanner threads. int64_t scanner_thread_bytes_required_; - /// Number of files that have not been issued from the scanners. - AtomicInt32 num_unqueued_files_; - - /// Map of HdfsScanner objects to file types. Only one scanner object will be - /// created for each file type. Objects stored in runtime_state's pool. - typedef std::map<THdfsFileFormat::type, HdfsScanner*> ScannerMap; - ScannerMap scanner_map_; - - /// Per scanner type codegen'd fn. - typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap; - CodegendFnMap codegend_fn_map_; - - /// Maps from a slot's path to its index into materialized_slots_. - typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap; - PathToSlotIdxMap path_to_materialized_slot_idx_; - - /// List of contexts for expected runtime filters for this scan node. These contexts are - /// cloned by individual scanners to be used in multi-threaded contexts, passed through - /// the per-scanner ScannerContext.. - std::vector<FilterContext> filter_ctxs_; - - /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise> - /// for 0 <= i < total # columns in table - // - /// This should be a vector<bool>, but bool vectors are special-cased and not stored - /// internally as arrays, so instead we store as chars and cast to bools as needed - std::vector<char> is_materialized_col_; - - /// Vector containing slot descriptors for all non-partition key slots. These - /// descriptors are sorted in order of increasing col_pos. - std::vector<SlotDescriptor*> materialized_slots_; - - /// Vector containing slot descriptors for all partition key slots. - std::vector<SlotDescriptor*> partition_key_slots_; - - /// Keeps track of total splits and the number finished. - ProgressUpdater progress_; - /// Scanner specific per file metadata (e.g. header information) and associated lock. /// This lock cannot be taken together with any other locks except lock_. boost::mutex metadata_lock_; @@ -431,43 +129,15 @@ class HdfsScanNode : public ScanNode { /// Maximum size of materialized_row_batches_. int max_materialized_row_batches_; - /// This is the number of io buffers that are owned by the scan node and the scanners. - /// This is used just to help debug leaked io buffers to determine if the leak is - /// happening in the scanners vs other parts of the execution. - AtomicInt32 num_owned_io_buffers_; - - /// Counters which track the number of scanners that have codegen enabled for the - /// materialize and conjuncts evaluation code paths. - AtomicInt32 num_scanners_codegen_enabled_; - AtomicInt32 num_scanners_codegen_disabled_; - - /// The size of the largest compressed text file to be scanned. This is used to - /// estimate scanner thread memory usage. - RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_; - - /// Disk accessed bitmap - RuntimeProfile::Counter disks_accessed_bitmap_; - - /// Total number of bytes read locally - RuntimeProfile::Counter* bytes_read_local_; - - /// Total number of bytes read via short circuit read - RuntimeProfile::Counter* bytes_read_short_circuit_; - - /// Total number of bytes read from data node cache - RuntimeProfile::Counter* bytes_read_dn_cache_; - - /// Total number of remote scan ranges - RuntimeProfile::Counter* num_remote_ranges_; - - /// Total number of bytes read remotely that were expected to be local - RuntimeProfile::Counter* unexpected_remote_bytes_; - /// Lock protects access between scanner thread and main query thread (the one calling /// GetNext()) for all fields below. If this lock and any other locks needs to be taken /// together, this lock must be taken first. boost::mutex lock_; + /// Protects file_type_counts_. Cannot be taken together with any other lock + /// except lock_, and if so, lock_ must be taken first. + SpinLock file_type_counts_; + /// Flag signaling that all scanner threads are done. This could be because they /// are finished, an error/cancellation occurred, or the limit was reached. /// Setting this to true triggers the scanner threads to clean up. @@ -478,27 +148,6 @@ class HdfsScanNode : public ScanNode { /// being processed by scanner threads, but no new ScannerThreads should be started. bool all_ranges_started_; - /// Pool for allocating some amounts of memory that is shared between scanners. - /// e.g. partition key tuple and their string buffers - boost::scoped_ptr<MemPool> scan_node_pool_; - - /// Status of failed operations. This is set in the ScannerThreads - /// Returned in GetNext() if an error occurred. An non-ok status triggers cleanup - /// scanner threads. - Status status_; - - /// Mapping of file formats (file type, compression type) to the number of - /// splits of that type and the lock protecting it. - /// This lock cannot be taken together with any other lock except lock_. - SpinLock file_type_counts_lock_; - typedef std::map< - std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap; - FileTypeCountsMap file_type_counts_; - - /// If true, counters are actively running and need to be reported in the runtime - /// profile. - bool counters_running_; - /// The id of the callback added to the thread resource manager when thread token /// is available. Used to remove the callback before this scan node is destroyed. /// -1 if no callback is registered. @@ -519,11 +168,6 @@ class HdfsScanNode : public ScanNode { /// (e.g., when adding new ranges) or when threads are available for this scan node. void ThreadTokenAvailableCb(ThreadResourceMgr::ResourcePool* pool); - /// Create and open new scanner for this partition type. - /// If the scanner is successfully created, it is returned in 'scanner'. - Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition, - ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner); - /// Main function for scanner thread. This thread pulls the next range to be /// processed from the IoMgr and then processes the entire range end to end. /// This thread terminates when all scan ranges are complete or an error occurred. @@ -546,42 +190,9 @@ class HdfsScanNode : public ScanNode { /// Checks for eos conditions and returns batches from materialized_row_batches_. Status GetNextInternal(RuntimeState* state, RowBatch* row_batch, bool* eos); - /// sets done_ to true and triggers threads to cleanup. Cannot be calld with + /// sets done_ to true and triggers threads to cleanup. Cannot be called with /// any locks taken. Calling it repeatedly ignores subsequent calls. void SetDone(); - - /// Stops periodic counters and aggregates counter values for the entire scan node. - /// This should be called as soon as the scan node is complete to get the most accurate - /// counter values. - /// This can be called multiple times, subsequent calls will be ignored. - /// This must be called on Close() to unregister counters. - void StopAndFinalizeCounters(); - - /// Recursively initializes all NULL collection slots to an empty CollectionValue in - /// addition to maintaining the null bit. Hack to allow UnnestNode to project out - /// collection slots. Assumes that the null bit has already been un/set. - /// TODO: remove this function once the TODOs in UnnestNode regarding projection - /// have been addressed. - void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const; - - /// Helper to call InitNullCollectionValues() on all tuples produced by this scan - /// in 'row_batch'. - void InitNullCollectionValues(RowBatch* row_batch) const; - - /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered - /// and therefore not processed. 'file_type' is the the format of 'file', and is used - /// for bookkeeping. Returns true if all filters pass or are not present. - bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs, - const THdfsFileFormat::type& file_type, HdfsFileDesc* file); - - /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns - /// true if all filters arrived within the time limit (as measured from the time of - /// RuntimeFilterBank::RegisterFilter()), false otherwise. - bool WaitForRuntimeFilters(int32_t time_ms); - - /// Calls ExecDebugAction(). Returns the status based on the debug action specified - /// for the query. - Status TriggerDebugAction(); }; }
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner-ir.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner-ir.cc b/be/src/exec/hdfs-scanner-ir.cc index 2b053ca..e6da220 100644 --- a/be/src/exec/hdfs-scanner-ir.cc +++ b/be/src/exec/hdfs-scanner-ir.cc @@ -37,7 +37,7 @@ int HdfsScanner::WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row, int row_ FieldLocation* fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_idx_start) { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(tuple_ != NULL); uint8_t* tuple_row_mem = reinterpret_cast<uint8_t*>(tuple_row); uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(tuple_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index ad3d834..841d1e4 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -26,6 +26,7 @@ #include "common/object-pool.h" #include "exec/text-converter.h" #include "exec/hdfs-scan-node.h" +#include "exec/hdfs-scan-node-mt.h" #include "exec/read-write-util.h" #include "exec/text-converter.inline.h" #include "exprs/expr-context.h" @@ -56,20 +57,20 @@ using namespace strings; const char* FieldLocation::LLVM_CLASS_NAME = "struct.impala::FieldLocation"; const char* HdfsScanner::LLVM_CLASS_NAME = "class.impala::HdfsScanner"; -HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, - bool add_batches_to_queue) +HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : scan_node_(scan_node), state_(state), - add_batches_to_queue_(add_batches_to_queue), context_(NULL), stream_(NULL), + eos_(false), scanner_conjunct_ctxs_(NULL), + template_tuple_pool_(new MemPool(scan_node->mem_tracker())), template_tuple_(NULL), tuple_byte_size_(scan_node->tuple_desc()->byte_size()), + num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()), tuple_(NULL), batch_(NULL), tuple_mem_(NULL), - num_null_bytes_(scan_node->tuple_desc()->num_null_bytes()), parse_status_(Status::OK()), decompression_type_(THdfsCompression::NONE), data_buffer_pool_(new MemPool(scan_node->mem_tracker())), @@ -80,16 +81,17 @@ HdfsScanner::HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, HdfsScanner::HdfsScanner() : scan_node_(NULL), state_(NULL), - add_batches_to_queue_(true), context_(NULL), stream_(NULL), + eos_(false), scanner_conjunct_ctxs_(NULL), + template_tuple_pool_(NULL), template_tuple_(NULL), tuple_byte_size_(-1), + num_null_bytes_(-1), tuple_(NULL), batch_(NULL), tuple_mem_(NULL), - num_null_bytes_(-1), parse_status_(Status::OK()), decompression_type_(THdfsCompression::NONE), data_buffer_pool_(NULL), @@ -107,19 +109,23 @@ Status HdfsScanner::Open(ScannerContext* context) { // Clone the scan node's conjuncts map. The cloned contexts must be closed by the // caller. - HdfsScanNode::ConjunctsMap::const_iterator iter = scan_node_->conjuncts_map().begin(); - for (; iter != scan_node_->conjuncts_map().end(); ++iter) { - RETURN_IF_ERROR(Expr::CloneIfNotExists(iter->second, - scan_node_->runtime_state(), &scanner_conjuncts_map_[iter->first])); + for (const auto& entry: scan_node_->conjuncts_map()) { + RETURN_IF_ERROR(Expr::CloneIfNotExists(entry.second, + scan_node_->runtime_state(), &scanner_conjuncts_map_[entry.first])); } DCHECK(scanner_conjuncts_map_.find(scan_node_->tuple_desc()->id()) != scanner_conjuncts_map_.end()); scanner_conjunct_ctxs_ = &scanner_conjuncts_map_[scan_node_->tuple_desc()->id()]; // Initialize the template_tuple_. - template_tuple_ = scan_node_->InitTemplateTuple( - state_, context_->partition_descriptor()->partition_key_value_ctxs()); + vector<ExprContext*> partition_key_value_ctxs; + RETURN_IF_ERROR(Expr::CloneIfNotExists( + context_->partition_descriptor()->partition_key_value_ctxs(), state_, + &partition_key_value_ctxs)); + template_tuple_ = scan_node_->InitTemplateTuple(partition_key_value_ctxs, + template_tuple_pool_.get(), state_); template_tuple_map_[scan_node_->tuple_desc()] = template_tuple_; + Expr::Close(partition_key_value_ctxs, state_); decompress_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DecompressionTime"); return Status::OK(); @@ -127,10 +133,7 @@ Status HdfsScanner::Open(ScannerContext* context) { void HdfsScanner::Close(RowBatch* row_batch) { if (decompressor_.get() != NULL) decompressor_->Close(); - HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin(); - for (; iter != scanner_conjuncts_map_.end(); ++iter) { - Expr::Close(iter->second, state_); - } + for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_); obj_pool_.Clear(); stream_ = NULL; context_->ClearStreams(); @@ -158,7 +161,7 @@ Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, } Status HdfsScanner::StartNewRowBatch() { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); batch_ = new RowBatch(scan_node_->row_desc(), state_->batch_size(), scan_node_->mem_tracker()); int64_t tuple_buffer_size; @@ -168,7 +171,7 @@ Status HdfsScanner::StartNewRowBatch() { } int HdfsScanner::GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem) { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(batch_ != NULL); DCHECK_GT(batch_->capacity(), batch_->num_rows()); *pool = batch_->tuple_data_pool(); @@ -190,7 +193,7 @@ Status HdfsScanner::GetCollectionMemory(CollectionValueBuilder* builder, MemPool // TODO(skye): have this check scan_node_->ReachedLimit() and get rid of manual check? Status HdfsScanner::CommitRows(int num_rows) { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(batch_ != NULL); DCHECK_LE(num_rows, batch_->capacity() - batch_->num_rows()); batch_->CommitRows(num_rows); @@ -201,16 +204,15 @@ Status HdfsScanner::CommitRows(int num_rows) { // if no rows passed predicates. if (batch_->AtCapacity() || context_->num_completed_io_buffers() > 0) { context_->ReleaseCompletedResources(batch_, /* done */ false); - scan_node_->AddMaterializedRowBatch(batch_); + static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(batch_); RETURN_IF_ERROR(StartNewRowBatch()); } if (context_->cancelled()) return Status::CANCELLED; // Check for UDF errors. RETURN_IF_ERROR(state_->GetQueryStatus()); // Free local expr allocations for this thread - HdfsScanNode::ConjunctsMap::const_iterator iter = scanner_conjuncts_map_.begin(); - for (; iter != scanner_conjuncts_map_.end(); ++iter) { - ExprContext::FreeLocalAllocations(iter->second); + for (const auto& entry: scanner_conjuncts_map_) { + ExprContext::FreeLocalAllocations(entry.second); } return Status::OK(); } @@ -357,8 +359,9 @@ bool HdfsScanner::WriteCompleteTuple(MemPool* pool, FieldLocation* fields, // eval_fail: ; preds = %parse // ret i1 false // } -Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* codegen, - const vector<ExprContext*>& conjunct_ctxs, Function** write_complete_tuple_fn) { +Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNodeBase* node, + LlvmCodeGen* codegen, const vector<ExprContext*>& conjunct_ctxs, + Function** write_complete_tuple_fn) { *write_complete_tuple_fn = NULL; SCOPED_TIMER(codegen->codegen_timer()); RuntimeState* state = node->runtime_state(); @@ -565,8 +568,9 @@ Status HdfsScanner::CodegenWriteCompleteTuple(HdfsScanNode* node, LlvmCodeGen* c return Status::OK(); } -Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNode* node, LlvmCodeGen* codegen, - Function* write_complete_tuple_fn, Function** write_aligned_tuples_fn) { +Status HdfsScanner::CodegenWriteAlignedTuples(HdfsScanNodeBase* node, + LlvmCodeGen* codegen, Function* write_complete_tuple_fn, + Function** write_aligned_tuples_fn) { *write_aligned_tuples_fn = NULL; SCOPED_TIMER(codegen->codegen_timer()); DCHECK(write_complete_tuple_fn != NULL); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 6245e5f..d9fa424 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -27,7 +27,7 @@ #include "codegen/impala-ir.h" #include "common/object-pool.h" -#include "exec/hdfs-scan-node.h" +#include "exec/hdfs-scan-node-base.h" #include "exec/scan-node.h" #include "exec/scanner-context.h" #include "runtime/disk-io-mgr.h" @@ -75,8 +75,10 @@ struct FieldLocation { /// 1. Open() /// 2. ProcessSplit() or GetNext()* /// 3. Close() -/// The scanner can be used in either of two modes, indicated via the add_batches_to_queue -/// c'tor parameter. +/// The scanner can be used in either of two modes. Which mode is expected to be used +/// depends on the type of parent scan node. Parent scan nodes with a row batch queue +/// are expected to call ProcessSplit() and not GetNext(). Row batches will be added to +/// the scan node's row batch queue, including the final one in Close(). /// ProcessSplit() scans the split and adds materialized row batches to the scan node's /// row batch queue until the split is complete or an error occurred. /// GetNext() provides an iterator-like interface where the caller can request @@ -103,16 +105,15 @@ struct FieldLocation { /// resources (IO buffers and mem pools) to the current row batch, and passing row batches /// up to the scan node. Subclasses can also use GetMemory() to help with per-row memory /// management. +/// TODO: Have a pass over all members and move them out of the base class if sensible +/// to clarify which state each concrete scanner type actually has. class HdfsScanner { public: /// Assumed size of an OS file block. Used mostly when reading file format headers, etc. /// This probably ought to be a derived number from the environment. const static int FILE_BLOCK_SIZE = 4096; - /// If 'add_batches_to_queue' is true the caller must call ProcessSplit() and not - /// GetNext(). Row batches will be added to the scan node's row batch queue, including - /// the final one in Close(). - HdfsScanner(HdfsScanNode* scan_node, RuntimeState* state, bool add_batches_to_queue); + HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); virtual ~HdfsScanner(); @@ -125,16 +126,16 @@ class HdfsScanner { /// returned, 'parse_status_' is guaranteed to be OK as well. /// The memory referenced by the tuples is valid until this or any subsequently /// returned batch is reset or destroyed. - /// Only valid to call if 'add_batches_to_queue_' is false. - Status GetNext(RowBatch* row_batch, bool* eos) { - DCHECK(!add_batches_to_queue_); - return GetNextInternal(row_batch, eos); + /// Only valid to call if the parent scan node is single-threaded. + Status GetNext(RowBatch* row_batch) { + DCHECK(!scan_node_->HasRowBatchQueue()); + return GetNextInternal(row_batch); } /// Process an entire split, reading bytes from the context's streams. Context is /// initialized with the split data (e.g. template tuple, partition descriptor, etc). /// This function should only return on error or end of scan range. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. virtual Status ProcessSplit() = 0; /// Transfers the ownership of memory backing returned tuples such as IO buffers @@ -143,9 +144,15 @@ class HdfsScanner { /// that are not backing returned rows (e.g. temporary decompression buffers). virtual void Close(RowBatch* row_batch); - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is single-threaded. + bool eos() const { + DCHECK(!scan_node_->HasRowBatchQueue()); + return eos_; + } + + /// Only valid to call if the parent scan node is multi-threaded. RowBatch* batch() const { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); return batch_; } @@ -177,16 +184,11 @@ class HdfsScanner { protected: /// The scan node that started this scanner - HdfsScanNode* scan_node_; + HdfsScanNodeBase* scan_node_; /// RuntimeState for error reporting RuntimeState* state_; - /// True if the creator of this scanner intends to use ProcessSplit() and not GetNext). - /// Row batches will be added to the scan node's row batch queue, including the final - /// one in Close(). - const bool add_batches_to_queue_; - /// Context for this scanner ScannerContext* context_; @@ -196,14 +198,24 @@ class HdfsScanner { /// The first stream for context_ ScannerContext::Stream* stream_; + /// Set if this scanner has processed all ranges and will not produce more rows. + /// Only relevant when calling the GetNext() interface. + bool eos_; + /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner /// has its own ExprContexts so the conjuncts can be safely evaluated in parallel. - HdfsScanNode::ConjunctsMap scanner_conjuncts_map_; + HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_; // Convenience reference to scanner_conjuncts_map_[scan_node_->tuple_idx()] for scanners // that do not support nested types. const std::vector<ExprContext*>* scanner_conjunct_ctxs_; + /// Holds memory for template tuples. The memory in this pool must remain valid as long + /// as the row batches produced by this scanner. This typically means that the + /// ownership is transferred to the last row batch in Close(). Some scanners transfer + /// the ownership to the parent scan node instead due being closed multiple times. + boost::scoped_ptr<MemPool> template_tuple_pool_; + /// A template tuple is a partially-materialized tuple with only partition key slots set /// (or other default values, such as NULL for columns missing in a file). The other /// slots are set to NULL. The template tuple must be copied into output tuples before @@ -211,9 +223,8 @@ class HdfsScanner { /// /// Each tuple descriptor (i.e. scan_node_->tuple_desc() and any collection item tuple /// descs) has a template tuple, or NULL if there are no partition key or default slots. - /// Template tuples are computed once for each file and valid for the duration of that - /// file. They are owned by the HDFS scan node, although each scanner has its own - /// template tuples. + /// Template tuples are computed once for each file and are allocated from + /// template_tuple_pool_. std::map<const TupleDescriptor*, Tuple*> template_tuple_map_; /// Convenience variable set to the top-level template tuple @@ -221,7 +232,10 @@ class HdfsScanner { Tuple* template_tuple_; /// Fixed size of each top-level tuple, in bytes - int tuple_byte_size_; + const int32_t tuple_byte_size_; + + /// Number of null bytes in the top-level tuple. + const int32_t num_null_bytes_; /// Current tuple pointer into tuple_mem_. Tuple* tuple_; @@ -239,9 +253,6 @@ class HdfsScanner { /// Helper class for converting text to other types; boost::scoped_ptr<TextConverter> text_converter_; - /// Number of null bytes in the top-level tuple. - int32_t num_null_bytes_; - /// Contains current parse status to minimize the number of Status objects returned. /// This significantly minimizes the cross compile dependencies for llvm since status /// objects inline a bunch of string functions. Also, status objects aren't extremely @@ -269,8 +280,8 @@ class HdfsScanner { WriteTuplesFn write_tuples_fn_; /// Implements GetNext(). Should be overridden by subclasses. - /// May be called even if 'add_batches_to_queue_' is true. - virtual Status GetNextInternal(RowBatch* row_batch, bool* eos) { + /// Only valid to call if the parent scan node is multi-threaded. + virtual Status GetNextInternal(RowBatch* row_batch) { DCHECK(false) << "GetNextInternal() not implemented for this scanner type."; return Status::OK(); } @@ -283,7 +294,7 @@ class HdfsScanner { THdfsFileFormat::type type, const std::string& scanner_name); /// Set 'batch_' to a new row batch and update 'tuple_mem_' accordingly. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. Status StartNewRowBatch(); /// Reset internal state for a new scan range. @@ -297,7 +308,7 @@ class HdfsScanner { /// current row batch is complete and a new one is allocated). /// Memory returned from this call is invalidated after calling CommitRows(). /// Callers must call GetMemory() again after calling this function. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. int GetMemory(MemPool** pool, Tuple** tuple_mem, TupleRow** tuple_row_mem); /// Gets memory for outputting tuples into the CollectionValue being constructed via @@ -316,15 +327,15 @@ class HdfsScanner { /// Returns Status::OK if the query is not cancelled and hasn't exceeded any mem limits. /// Scanner can call this with 0 rows to flush any pending resources (attached pools /// and io buffers) to minimize memory consumption. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. Status CommitRows(int num_rows); /// Release all memory in 'pool' to batch_. If commit_batch is true, the row batch /// will be committed. commit_batch should be true if the attached pool is expected /// to be non-trivial (i.e. a decompression buffer) to minimize scanner mem usage. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. void AttachPool(MemPool* pool, bool commit_batch) { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); DCHECK(batch_ != NULL); DCHECK(pool != NULL); batch_->tuple_data_pool()->AcquireData(pool, false); @@ -359,7 +370,7 @@ class HdfsScanner { /// Returns the number of tuples added to the row batch. This can be less than /// num_tuples/tuples_till_limit because of failed conjuncts. /// Returns -1 if parsing should be aborted due to parse errors. - /// Only valid to call if 'add_batches_to_queue_' is true. + /// Only valid to call if the parent scan node is multi-threaded. int WriteAlignedTuples(MemPool* pool, TupleRow* tuple_row_mem, int row_size, FieldLocation* fields, int num_tuples, int max_added_tuples, int slots_per_tuple, int row_start_indx); @@ -410,7 +421,7 @@ class HdfsScanner { /// Codegen function to replace WriteCompleteTuple. Should behave identically /// to WriteCompleteTuple. Stores the resulting function in 'write_complete_tuple_fn' /// if codegen was successful or NULL otherwise. - static Status CodegenWriteCompleteTuple(HdfsScanNode*, LlvmCodeGen*, + static Status CodegenWriteCompleteTuple(HdfsScanNodeBase* node, LlvmCodeGen* codegen, const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** write_complete_tuple_fn); @@ -418,7 +429,7 @@ class HdfsScanner { /// compiled to IR. This function loads the precompiled IR function, modifies it, /// and stores the resulting function in 'write_aligned_tuples_fn' if codegen was /// successful or NULL otherwise. - static Status CodegenWriteAlignedTuples(HdfsScanNode*, LlvmCodeGen*, + static Status CodegenWriteAlignedTuples(HdfsScanNodeBase*, LlvmCodeGen*, llvm::Function* write_tuple_fn, llvm::Function** write_aligned_tuples_fn); /// Report parse error for column @ desc. If abort_on_error is true, sets http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.cc b/be/src/exec/hdfs-sequence-scanner.cc index ec52901..03a07ce 100644 --- a/be/src/exec/hdfs-sequence-scanner.cc +++ b/be/src/exec/hdfs-sequence-scanner.cc @@ -41,9 +41,9 @@ const uint8_t HdfsSequenceScanner::SEQFILE_VERSION_HEADER[4] = {'S', 'E', 'Q', 6 #define RETURN_IF_FALSE(x) if (UNLIKELY(!(x))) return parse_status_ -HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state, - bool add_batches_to_queue) - : BaseSequenceScanner(scan_node, state, add_batches_to_queue), +HdfsSequenceScanner::HdfsSequenceScanner(HdfsScanNodeBase* scan_node, + RuntimeState* state) + : BaseSequenceScanner(scan_node, state), unparsed_data_buffer_(NULL), num_buffered_records_in_compressed_block_(0) { } @@ -52,7 +52,7 @@ HdfsSequenceScanner::~HdfsSequenceScanner() { } // Codegen for materialized parsed data into tuples. -Status HdfsSequenceScanner::Codegen(HdfsScanNode* node, +Status HdfsSequenceScanner::Codegen(HdfsScanNodeBase* node, const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { *write_aligned_tuples_fn = NULL; if (!node->runtime_state()->codegen_enabled()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-sequence-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-sequence-scanner.h b/be/src/exec/hdfs-sequence-scanner.h index e2dcee9..5d6074f 100644 --- a/be/src/exec/hdfs-sequence-scanner.h +++ b/be/src/exec/hdfs-sequence-scanner.h @@ -162,8 +162,7 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// SeqFile file: {'S', 'E', 'Q', 6} static const uint8_t SEQFILE_VERSION_HEADER[4]; - HdfsSequenceScanner(HdfsScanNode* scan_node, RuntimeState* state, - bool add_batches_to_queue); + HdfsSequenceScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); virtual ~HdfsSequenceScanner(); @@ -172,7 +171,8 @@ class HdfsSequenceScanner : public BaseSequenceScanner { /// Codegen WriteAlignedTuples(). Stores the resulting function in /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. - static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + static Status Codegen(HdfsScanNodeBase* node, + const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** write_aligned_tuples_fn); protected: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index f1c80b0..f3f0a7c 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -50,9 +50,8 @@ const string HdfsTextScanner::LZO_INDEX_SUFFIX = ".index"; // progress. const int64_t COMPRESSED_DATA_FIXED_READ_SIZE = 1 * 1024 * 1024; -HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state, - bool add_batches_to_queue) - : HdfsScanner(scan_node, state, add_batches_to_queue), +HdfsTextScanner::HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) + : HdfsScanner(scan_node, state), byte_buffer_ptr_(NULL), byte_buffer_end_(NULL), byte_buffer_read_size_(0), @@ -67,7 +66,7 @@ HdfsTextScanner::HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state, HdfsTextScanner::~HdfsTextScanner() { } -Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node, +Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector<HdfsFileDesc*>& files) { vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges; int compressed_text_files = 0; @@ -108,7 +107,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node, // Populate the list of compressed text scan ranges. DCHECK_GT(files[i]->file_length, 0); ScanRangeMetadata* metadata = - reinterpret_cast<ScanRangeMetadata*>(split->meta_data()); + static_cast<ScanRangeMetadata*>(split->meta_data()); DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange( files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0, metadata->partition_id, split->disk_id(), split->try_cache(), @@ -149,7 +148,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNode* scan_node, } Status HdfsTextScanner::ProcessSplit() { - DCHECK(add_batches_to_queue_); + DCHECK(scan_node_->HasRowBatchQueue()); // Reset state for new scan range RETURN_IF_ERROR(InitNewRange()); @@ -187,12 +186,19 @@ void HdfsTextScanner::Close(RowBatch* row_batch) { decompressor_.reset(NULL); } if (row_batch != NULL) { + row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false); row_batch->tuple_data_pool()->AcquireData(boundary_pool_.get(), false); context_->ReleaseCompletedResources(row_batch, true); - if (add_batches_to_queue_) scan_node_->AddMaterializedRowBatch(row_batch); + if (scan_node_->HasRowBatchQueue()) { + static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(row_batch); + } + } else { + if (template_tuple_pool_.get() != NULL) template_tuple_pool_->FreeAll(); } + // Verify all resources (if any) have been transferred. + DCHECK_EQ(template_tuple_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(data_buffer_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(boundary_pool_.get()->total_allocated_bytes(), 0); DCHECK_EQ(context_->num_completed_io_buffers(), 0); @@ -687,7 +693,7 @@ Status HdfsTextScanner::CheckForSplitDelimiter(bool* split_delimiter) { // Codegen for materializing parsed data into tuples. The function WriteCompleteTuple is // codegen'd using the IRBuilder for the specific tuple description. This function // is then injected into the cross-compiled driving function, WriteAlignedTuples(). -Status HdfsTextScanner::Codegen(HdfsScanNode* node, +Status HdfsTextScanner::Codegen(HdfsScanNodeBase* node, const vector<ExprContext*>& conjunct_ctxs, Function** write_aligned_tuples_fn) { *write_aligned_tuples_fn = NULL; if (!node->runtime_state()->codegen_enabled()) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-text-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.h b/be/src/exec/hdfs-text-scanner.h index 937626f..e68d45f 100644 --- a/be/src/exec/hdfs-text-scanner.h +++ b/be/src/exec/hdfs-text-scanner.h @@ -47,8 +47,7 @@ struct HdfsFileDesc; /// scanner for the tuple directly after it. class HdfsTextScanner : public HdfsScanner { public: - HdfsTextScanner(HdfsScanNode* scan_node, RuntimeState* state, - bool add_batches_to_queue); + HdfsTextScanner(HdfsScanNodeBase* scan_node, RuntimeState* state); virtual ~HdfsTextScanner(); /// Implementation of HdfsScanner interface. @@ -57,12 +56,13 @@ class HdfsTextScanner : public HdfsScanner { virtual void Close(RowBatch* row_batch); /// Issue io manager byte ranges for 'files'. - static Status IssueInitialRanges(HdfsScanNode* scan_node, + static Status IssueInitialRanges(HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files); /// Codegen WriteAlignedTuples(). Stores the resulting function in /// 'write_aligned_tuples_fn' if codegen was successful or NULL otherwise. - static Status Codegen(HdfsScanNode*, const std::vector<ExprContext*>& conjunct_ctxs, + static Status Codegen(HdfsScanNodeBase* node, + const std::vector<ExprContext*>& conjunct_ctxs, llvm::Function** write_aligned_tuples_fn); /// Suffix for lzo index files. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 69d682e..033ab82 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -19,6 +19,7 @@ #include <gutil/strings/substitute.h> +#include "exec/hdfs-scan-node-base.h" #include "exec/hdfs-scan-node.h" #include "runtime/row-batch.h" #include "runtime/mem-pool.h" @@ -40,7 +41,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; // output_buffer_bytes_left_ will be set to something else. static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0; -ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNode* scan_node, +ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range, const vector<FilterContext>& filter_ctxs) : state_(state), @@ -311,7 +312,8 @@ Status ScannerContext::Stream::GetBytesInternal(int64_t requested_len, } bool ScannerContext::cancelled() const { - return scan_node_->done_; + if (!scan_node_->HasRowBatchQueue()) return false; + return static_cast<HdfsScanNode*>(scan_node_)->done(); } Status ScannerContext::Stream::ReportIncompleteRead(int64_t length, int64_t bytes_read) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 9178324..1f1bc0d 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -31,7 +31,7 @@ namespace impala { struct HdfsFileDesc; class HdfsPartitionDescriptor; -class HdfsScanNode; +class HdfsScanNodeBase; class MemPool; class RowBatch; class RuntimeState; @@ -55,12 +55,14 @@ class TupleRow; /// from processing the bytes. This is the consumer. /// 3. The scan node/main thread which calls into the context to trigger cancellation /// or other end of stream conditions. +/// TODO: Some of the synchronization mechanisms such as cancelled() can be removed +/// once the legacy hdfs scan node has been removed. class ScannerContext { public: /// Create a scanner context with the parent scan_node (where materialized row batches /// get pushed to) and the scan range to process. /// This context starts with 1 stream. - ScannerContext(RuntimeState*, HdfsScanNode*, HdfsPartitionDescriptor*, + ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs); /// Destructor verifies that all stream objects have been released. @@ -295,7 +297,8 @@ class ScannerContext { /// The stream is created in the runtime state's object pool Stream* AddStream(DiskIoMgr::ScanRange* range); - /// If true, the ScanNode has been cancelled and the scanner thread should finish up + /// Returns false it scan_node_ is multi-threaded and has been cancelled. + /// Always returns false if the scan_node_ is not multi-threaded. bool cancelled() const; int num_completed_io_buffers() const { return num_completed_io_buffers_; } @@ -306,7 +309,7 @@ class ScannerContext { friend class Stream; RuntimeState* state_; - HdfsScanNode* scan_node_; + HdfsScanNodeBase* scan_node_; HdfsPartitionDescriptor* partition_desc_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exprs/expr-context.h ---------------------------------------------------------------------- diff --git a/be/src/exprs/expr-context.h b/be/src/exprs/expr-context.h index 3c5d5af..09de6a5 100644 --- a/be/src/exprs/expr-context.h +++ b/be/src/exprs/expr-context.h @@ -110,8 +110,9 @@ class ExprContext { return fn_contexts_[i]; } - Expr* root() { return root_; } - bool closed() { return closed_; } + Expr* root() const { return root_; } + bool closed() const { return closed_; } + bool is_clone() const { return is_clone_; } /// Calls Get*Val on root_ BooleanVal GetBooleanVal(TupleRow* row); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/runtime/tuple.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h index 10811ea..3a3e399 100644 --- a/be/src/runtime/tuple.h +++ b/be/src/runtime/tuple.h @@ -19,7 +19,6 @@ #ifndef IMPALA_RUNTIME_TUPLE_H #define IMPALA_RUNTIME_TUPLE_H -#include <cstring> #include "codegen/impala-ir.h" #include "common/logging.h" #include "gutil/macros.h" @@ -70,9 +69,7 @@ class Tuple { return result; } - void Init(int size) { - bzero(this, size); - } + void Init(int size) { memset(this, 0, size); } /// The total size of all data represented in this tuple (tuple data and referenced /// string and collection data). http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/common/thrift/ImpalaInternalService.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index f4b070f..bf03d98 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -183,10 +183,9 @@ struct TQueryOptions { 43: optional TParquetFallbackSchemaResolution parquet_fallback_schema_resolution = 0 // Multi-threaded execution: number of cores per query per node. - // > 1: multi-threaded execution mode, with given number of cores - // 1: single-threaded execution mode - // 0: multi-threaded execution mode, number of cores is the pool default - 44: optional i32 mt_num_cores = 1 + // > 0: multi-threaded execution mode, with given number of cores + // 0: single-threaded execution mode + 44: optional i32 mt_num_cores = 0 // If true, INSERT writes to S3 go directly to their final location rather than being // copied there by the coordinator. We cannot do this for INSERT OVERWRITES because for http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/fe/src/main/java/com/cloudera/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/com/cloudera/impala/service/Frontend.java b/fe/src/main/java/com/cloudera/impala/service/Frontend.java index 83af818..2d38396 100644 --- a/fe/src/main/java/com/cloudera/impala/service/Frontend.java +++ b/fe/src/main/java/com/cloudera/impala/service/Frontend.java @@ -956,7 +956,7 @@ public class Frontend { LOG.debug("create plan"); Planner planner = new Planner(analysisResult, queryCtx); if (RuntimeEnv.INSTANCE.isTestEnv() - && queryCtx.request.query_options.mt_num_cores != 1) { + && queryCtx.request.query_options.mt_num_cores > 0) { // TODO: this is just to be able to run tests; implement this List<PlanFragment> planRoots = planner.createParallelPlans(); for (PlanFragment planRoot: planRoots) {
