http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h new file mode 100644 index 0000000..7ea4b9d --- /dev/null +++ b/be/src/exec/hdfs-scan-node-base.h @@ -0,0 +1,459 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_ +#define IMPALA_EXEC_HDFS_SCAN_NODE_BASE_H_ + +#include <stdint.h> +#include <memory> +#include <unordered_set> +#include <vector> + +#include <boost/unordered_map.hpp> +#include <boost/scoped_ptr.hpp> + +#include "exec/filter-context.h" +#include "exec/scan-node.h" +#include "runtime/descriptors.h" +#include "runtime/disk-io-mgr.h" +#include "util/avro-util.h" +#include "util/progress-updater.h" +#include "util/spinlock.h" + +namespace impala { + +class ScannerContext; +class DescriptorTbl; +class HdfsScanner; +class RowBatch; +class Status; +class Tuple; +class TPlanNode; +class TScanRange; + +/// Maintains per file information for files assigned to this scan node. This includes +/// all the splits for the file. Note that it is not thread-safe. +struct HdfsFileDesc { + HdfsFileDesc(const std::string& filename) + : fs(NULL), filename(filename), file_length(0), mtime(0), + file_compression(THdfsCompression::NONE) { + } + + /// Connection to the filesystem containing the file. + hdfsFS fs; + + /// File name including the path. + std::string filename; + + /// Length of the file. This is not related to which parts of the file have been + /// assigned to this node. + int64_t file_length; + + /// Last modified time + int64_t mtime; + + THdfsCompression::type file_compression; + + /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node. + std::vector<DiskIoMgr::ScanRange*> splits; +}; + +/// Struct for additional metadata for scan ranges. This contains the partition id +/// that this scan range is for. +struct ScanRangeMetadata { + /// The partition id that this range is part of. + int64_t partition_id; + + /// For parquet scan ranges we initially create a request for the file footer for each + /// split; we store a pointer to the actual split so that we can recover its information + /// for the scanner to process. + const DiskIoMgr::ScanRange* original_split; + + ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split) + : partition_id(partition_id), original_split(original_split) { } +}; + + +/// Base class for all Hdfs scan nodes. Contains common members and functions +/// that are independent of whether batches are materialized by the main thread +/// (via HdfsScanner::GexNext()) or by spinning up separate threads that feed +/// into a RowBatch queue (via HdfsScanner::ProcessSplit()). Those specifics +/// are expected to be implemented in subclasses. +/// +/// Subclasses may expect to receive runtime filters produced elsewhere in the plan +/// (even from remote fragments). These filters arrive asynchronously during execution, +/// and are applied as soon as they arrive. Filters may be applied by the scan node in +/// the following scopes: +/// +/// 1. Per-file (all file formats, partition column filters only) - filtering at this +/// scope saves IO as the filters are applied before scan ranges are issued. +/// 2. Per-scan-range (all file formats, partition column filters only) - filtering at +/// this scope saves CPU as filtered scan ranges are never scanned. +/// +/// Scanners may also use the same filters to eliminate rows at finer granularities +/// (e.g. per row). +/// +/// TODO: Revisit and minimize metrics. Move those specific to legacy multi-threaded +/// scans into HdfsScanNode. +/// TODO: Once the legacy scan node has been removed, several functions can be made +/// non-virtual. Also merge this class with HdfsScanNodeMt. +class HdfsScanNodeBase : public ScanNode { + public: + HdfsScanNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~HdfsScanNodeBase(); + + virtual Status Init(const TPlanNode& tnode, RuntimeState* state); + virtual Status Prepare(RuntimeState* state); + virtual Status Open(RuntimeState* state); + virtual Status Reset(RuntimeState* state); + virtual void Close(RuntimeState* state); + + /// Returns true if this node uses separate threads for scanners that append RowBatches + /// to a queue, false otherwise. + virtual bool HasRowBatchQueue() const = 0; + + int limit() const { return limit_; } + + const std::vector<SlotDescriptor*>& materialized_slots() + const { return materialized_slots_; } + + /// Returns the tuple idx into the row for this scan node to output to. + /// Currently this is always 0. + int tuple_idx() const { return 0; } + + /// Returns number of partition keys in the table. + int num_partition_keys() const { return hdfs_table_->num_clustering_cols(); } + + /// Returns number of partition key slots. + int num_materialized_partition_keys() const { return partition_key_slots_.size(); } + + const TupleDescriptor* tuple_desc() const { return tuple_desc_; } + const HdfsTableDescriptor* hdfs_table() { return hdfs_table_; } + const AvroSchemaElement& avro_schema() { return *avro_schema_.get(); } + RuntimeState* runtime_state() { return runtime_state_; } + int skip_header_line_count() const { return skip_header_line_count_; } + DiskIoRequestContext* reader_context() { return reader_context_; } + + typedef std::map<TupleId, std::vector<ExprContext*>> ConjunctsMap; + const ConjunctsMap& conjuncts_map() const { return conjuncts_map_; } + + RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length() { + return max_compressed_text_file_length_; + } + + const static int SKIP_COLUMN = -1; + + /// Returns index into materialized_slots with 'path'. Returns SKIP_COLUMN if + /// that path is not materialized. + int GetMaterializedSlotIdx(const std::vector<int>& path) const { + PathToSlotIdxMap::const_iterator result = path_to_materialized_slot_idx_.find(path); + if (result == path_to_materialized_slot_idx_.end()) return SKIP_COLUMN; + return result->second; + } + + /// The result array is of length hdfs_table_->num_cols(). The i-th element is true iff + /// column i should be materialized. + const bool* is_materialized_col() { + return reinterpret_cast<const bool*>(&is_materialized_col_[0]); + } + + /// Returns the per format codegen'd function. Scanners call this to get the + /// codegen'd function to use. Returns NULL if codegen should not be used. + void* GetCodegenFn(THdfsFileFormat::type); + + inline void IncNumScannersCodegenEnabled() { num_scanners_codegen_enabled_.Add(1); } + inline void IncNumScannersCodegenDisabled() { num_scanners_codegen_disabled_.Add(1); } + + /// Allocate a new scan range object, stored in the runtime state's object pool. For + /// scan ranges that correspond to the original hdfs splits, the partition id must be + /// set to the range's partition id. For other ranges (e.g. columns in parquet, read + /// past buffers), the partition_id is unused. expected_local should be true if this + /// scan range is not expected to require a remote read. The range must fall within + /// the file bounds. That is, the offset must be >= 0, and offset + len <= file_length. + /// If not NULL, the 'original_split' pointer is stored for reference in the scan range + /// metadata of the scan range that is to be allocated. + /// This is thread safe. + DiskIoMgr::ScanRange* AllocateScanRange( + hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, + int disk_id, bool try_cache, bool expected_local, int64_t mtime, + const DiskIoMgr::ScanRange* original_split = NULL); + + /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan + /// ranges have been added completely. A file's scan ranges are added completely if no + /// new scanner threads will be needed to process that file besides the additional + /// threads needed to process those in 'ranges'. + /// Can be overridden to add scan-node specific actions like starting scanner threads. + virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, + int num_files_queued); + + /// Adds all splits for file_desc to the io mgr queue and indicates one file has + /// been added completely. + inline Status AddDiskIoRanges(const HdfsFileDesc* file_desc) { + return AddDiskIoRanges(file_desc->splits, 1); + } + + /// Allocates and initializes a new template tuple allocated from pool with values + /// from the partition columns for the current scan range, if any, + /// Returns NULL if there are no partition keys slots. + Tuple* InitTemplateTuple(const std::vector<ExprContext*>& value_ctxs, + MemPool* pool, RuntimeState* state) const; + + /// Returns the file desc for 'filename'. Returns NULL if filename is invalid. + HdfsFileDesc* GetFileDesc(const std::string& filename); + + /// Called by scanners when a range is complete. Used to record progress. + /// This *must* only be called after a scanner has completely finished its + /// scan range (i.e. context->Flush()), and has returned the final row batch. + /// Otherwise, scan nodes using a RowBatch queue may lose the last batch due + /// to racing with shutting down the queue. + void RangeComplete(const THdfsFileFormat::type& file_type, + const THdfsCompression::type& compression_type); + /// Same as above except for when multiple compression codecs were used + /// in the file. The metrics are incremented for each compression_type. + virtual void RangeComplete(const THdfsFileFormat::type& file_type, + const std::vector<THdfsCompression::type>& compression_type); + + /// Utility function to compute the order in which to materialize slots to allow for + /// computing conjuncts as slots get materialized (on partial tuples). + /// 'order' will contain for each slot, the first conjunct it is associated with. + /// e.g. order[2] = 1 indicates materialized_slots[2] must be materialized before + /// evaluating conjuncts[1]. Slots that are not referenced by any conjuncts will have + /// order set to conjuncts.size() + void ComputeSlotMaterializationOrder(std::vector<int>* order) const; + + /// Returns true if there are no materialized slots, such as a count(*) over the table. + inline bool IsZeroSlotTableScan() const { + return materialized_slots().empty() && tuple_desc()->tuple_path().empty(); + } + + /// map from volume id to <number of split, per volume split lengths> + typedef boost::unordered_map<int32_t, std::pair<int, int64_t>> PerVolumnStats; + + /// Update the per volume stats with the given scan range params list + static void UpdateHdfsSplitStats( + const std::vector<TScanRangeParams>& scan_range_params_list, + PerVolumnStats* per_volume_stats); + + /// Output the per_volume_stats to stringstream. The output format is a list of: + /// <volume id>:<# splits>/<per volume split lengths> + static void PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats, + std::stringstream* ss); + + /// Description string for the per volume stats output. + static const std::string HDFS_SPLIT_STATS_DESC; + + /// Returns true if partition 'partition_id' passes all the filter predicates in + /// 'filter_ctxs' and should not be filtered out. 'stats_name' is the key of one of the + /// counter groups in FilterStats, and is used to update the correct statistics. + /// + /// 'filter_ctxs' is either an empty list, in which case filtering is disabled and the + /// function returns true, or a set of filter contexts to evaluate. + bool PartitionPassesFilters(int32_t partition_id, const std::string& stats_name, + const std::vector<FilterContext>& filter_ctxs); + + const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; } + + protected: + friend class ScannerContext; + friend class HdfsScanner; + + RuntimeState* runtime_state_; + + // Number of header lines to skip at the beginning of each file of this table. Only set + // to values > 0 for hdfs text files. + const int skip_header_line_count_; + + /// Tuple id resolved in Prepare() to set tuple_desc_ + const int tuple_id_; + + /// RequestContext object to use with the disk-io-mgr for reads. + DiskIoRequestContext* reader_context_; + + /// Descriptor for tuples this scan node constructs + const TupleDescriptor* tuple_desc_; + + /// Map from partition ID to a template tuple (owned by scan_node_pool_) which has only + /// the partition columns for that partition materialized. Used to filter files and scan + /// ranges on partition-column filters. Populated in Prepare(). + boost::unordered_map<int64_t, Tuple*> partition_template_tuple_map_; + + /// Descriptor for the hdfs table, including partition and format metadata. + /// Set in Prepare, owned by RuntimeState + const HdfsTableDescriptor* hdfs_table_; + + /// The root of the table's Avro schema, if we're scanning an Avro table. + ScopedAvroSchemaElement avro_schema_; + + /// If true, the warning that some disk ids are unknown was logged. Only log this once + /// per scan node since it can be noisy. + bool unknown_disk_id_warned_; + + /// Partitions scanned by this scan node. + std::unordered_set<int64_t> partition_ids_; + + /// File path => file descriptor (which includes the file's splits) + typedef std::map<std::string, HdfsFileDesc*> FileDescMap; + FileDescMap file_descs_; + + /// File format => file descriptors. + typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>> FileFormatsMap; + FileFormatsMap per_type_files_; + + /// Conjuncts for each materialized tuple (top-level row batch tuples and collection + /// item tuples). Includes a copy of ExecNode.conjuncts_. + ConjunctsMap conjuncts_map_; + + /// Set to true when the initial scan ranges are issued to the IoMgr. This happens on + /// the first call to GetNext(). The token manager, in a different thread, will read + /// this variable. + bool initial_ranges_issued_; + + /// Number of files that have not been issued from the scanners. + AtomicInt32 num_unqueued_files_; + + /// Per scanner type codegen'd fn. + typedef std::map<THdfsFileFormat::type, void*> CodegendFnMap; + CodegendFnMap codegend_fn_map_; + + /// Maps from a slot's path to its index into materialized_slots_. + typedef boost::unordered_map<std::vector<int>, int> PathToSlotIdxMap; + PathToSlotIdxMap path_to_materialized_slot_idx_; + + /// List of contexts for expected runtime filters for this scan node. These contexts are + /// cloned by individual scanners to be used in multi-threaded contexts, passed through + /// the per-scanner ScannerContext.. + std::vector<FilterContext> filter_ctxs_; + + /// is_materialized_col_[i] = <true i-th column should be materialized, false otherwise> + /// for 0 <= i < total # columns in table + // + /// This should be a vector<bool>, but bool vectors are special-cased and not stored + /// internally as arrays, so instead we store as chars and cast to bools as needed + std::vector<char> is_materialized_col_; + + /// Vector containing slot descriptors for all non-partition key slots. These + /// descriptors are sorted in order of increasing col_pos. + std::vector<SlotDescriptor*> materialized_slots_; + + /// Vector containing slot descriptors for all partition key slots. + std::vector<SlotDescriptor*> partition_key_slots_; + + /// Keeps track of total splits and the number finished. + ProgressUpdater progress_; + + /// Counters which track the number of scanners that have codegen enabled for the + /// materialize and conjuncts evaluation code paths. + AtomicInt32 num_scanners_codegen_enabled_; + AtomicInt32 num_scanners_codegen_disabled_; + + /// This is the number of io buffers that are owned by the scan node and the scanners. + /// This is used just to help debug leaked io buffers to determine if the leak is + /// happening in the scanners vs other parts of the execution. + /// TODO: Remove this counter when deprecating the multi-threaded scan node. + AtomicInt32 num_owned_io_buffers_; + + /// If true, counters are actively running and need to be reported in the runtime + /// profile. + bool counters_running_; + + /// The size of the largest compressed text file to be scanned. This is used to + /// estimate scanner thread memory usage. + RuntimeProfile::HighWaterMarkCounter* max_compressed_text_file_length_; + + /// Disk accessed bitmap + RuntimeProfile::Counter disks_accessed_bitmap_; + + /// Total number of bytes read locally + RuntimeProfile::Counter* bytes_read_local_; + + /// Total number of bytes read via short circuit read + RuntimeProfile::Counter* bytes_read_short_circuit_; + + /// Total number of bytes read from data node cache + RuntimeProfile::Counter* bytes_read_dn_cache_; + + /// Total number of remote scan ranges + RuntimeProfile::Counter* num_remote_ranges_; + + /// Total number of bytes read remotely that were expected to be local + RuntimeProfile::Counter* unexpected_remote_bytes_; + + /// Pool for allocating some amounts of memory that is shared between scanners. + /// e.g. partition key tuple and their string buffers + boost::scoped_ptr<MemPool> scan_node_pool_; + + /// Status of failed operations. This is set in the ScannerThreads + /// Returned in GetNext() if an error occurred. An non-ok status triggers cleanup + /// scanner threads. + Status status_; + + /// Mapping of file formats (file type, compression type) to the number of + /// splits of that type and the lock protecting it. + typedef std::map< + std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap; + FileTypeCountsMap file_type_counts_; + + /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and + /// issues initial ranges for all file types. Waits for runtime filters if necessary. + /// Only valid to call if !initial_ranges_issued_. Sets initial_ranges_issued_ to true. + Status IssueInitialScanRanges(RuntimeState* state); + + /// Create and open new scanner for this partition type. + /// If the scanner is successfully created, it is returned in 'scanner'. + /// Passes 'add_batches_to_queue' to the scanner constructor. + Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition, + ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner); + + /// Recursively initializes all NULL collection slots to an empty CollectionValue in + /// addition to maintaining the null bit. Hack to allow UnnestNode to project out + /// collection slots. Assumes that the null bit has already been un/set. + /// TODO: Remove this function once the TODOs in UnnestNode regarding projection + /// have been addressed. + void InitNullCollectionValues(const TupleDescriptor* tuple_desc, Tuple* tuple) const; + + /// Helper to call InitNullCollectionValues() on all tuples produced by this scan + /// in 'row_batch'. + void InitNullCollectionValues(RowBatch* row_batch) const; + + /// Returns false if, according to filters in 'filter_ctxs', 'file' should be filtered + /// and therefore not processed. 'file_type' is the the format of 'file', and is used + /// for bookkeeping. Returns true if all filters pass or are not present. + bool FilePassesFilterPredicates(const std::vector<FilterContext>& filter_ctxs, + const THdfsFileFormat::type& file_type, HdfsFileDesc* file); + + /// Waits for up to time_ms for runtime filters to arrive, checking every 20ms. Returns + /// true if all filters arrived within the time limit (as measured from the time of + /// RuntimeFilterBank::RegisterFilter()), false otherwise. + bool WaitForRuntimeFilters(int32_t time_ms); + + /// Stops periodic counters and aggregates counter values for the entire scan node. + /// This should be called as soon as the scan node is complete to get the most accurate + /// counter values. + /// This can be called multiple times, subsequent calls will be ignored. + /// This must be called on Close() to unregister counters. + /// Scan nodes with a RowBatch queue may have to synchronize calls to this function. + void StopAndFinalizeCounters(); + + /// Calls ExecDebugAction(). Returns the status based on the debug action specified + /// for the query. + Status TriggerDebugAction(); +}; + +} + +#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc new file mode 100644 index 0000000..8c61326 --- /dev/null +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/hdfs-scan-node-mt.h" + +#include <sstream> + +#include "runtime/runtime-state.h" +#include "runtime/row-batch.h" +#include "util/debug-util.h" +#include "util/runtime-profile-counters.h" + +#include "gen-cpp/PlanNodes_types.h" + +using std::stringstream; + +namespace impala { + +HdfsScanNodeMt::HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, + const DescriptorTbl& descs) + : HdfsScanNodeBase(pool, tnode, descs), + scan_range_(NULL), + scanner_(NULL) { +} + +HdfsScanNodeMt::~HdfsScanNodeMt() { +} + +Status HdfsScanNodeMt::Prepare(RuntimeState* state) { + RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); + // Return an error if this scan node has been assigned a range that is not supported + // because the scanner of the corresponding file format does implement GetNext(). + for (const auto& files: per_type_files_) { + if (!files.second.empty() && files.first != THdfsFileFormat::PARQUET) { + stringstream msg; + msg << "Unsupported file format with HdfsScanNodeMt: " << files.first; + return Status(msg.str()); + } + } + return Status::OK(); +} + +Status HdfsScanNodeMt::Open(RuntimeState* state) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(HdfsScanNodeBase::Open(state)); + DCHECK(!initial_ranges_issued_); + RETURN_IF_ERROR(IssueInitialScanRanges(state)); + return Status::OK(); +} + +Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(ExecDebugAction(TExecNodePhase::GETNEXT, state)); + RETURN_IF_CANCELLED(state); + RETURN_IF_ERROR(QueryMaintenance(state)); + + DCHECK(scan_range_ == NULL || scanner_ != NULL); + if (scan_range_ == NULL || scanner_->eos()) { + if (scanner_ != NULL && scanner_->eos()) { + scanner_->Close(row_batch); + scanner_.reset(); + } + RETURN_IF_ERROR( + runtime_state_->io_mgr()->GetNextRange(reader_context_, &scan_range_)); + if (scan_range_ == NULL) { + *eos = true; + StopAndFinalizeCounters(); + return Status::OK(); + } + ScanRangeMetadata* metadata = + static_cast<ScanRangeMetadata*>(scan_range_->meta_data()); + int64_t partition_id = metadata->partition_id; + HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); + scanner_ctx_.reset(new ScannerContext( + runtime_state_, this, partition, scan_range_, filter_ctxs())); + RETURN_IF_ERROR(CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_)); + } + + Status status = scanner_->GetNext(row_batch); + if (!status.ok()) { + scanner_->Close(row_batch); + scanner_.reset(); + num_owned_io_buffers_.Add(-row_batch->num_io_buffers()); + return status; + } + + num_rows_returned_ += row_batch->num_rows(); + if (ReachedLimit()) { + int num_rows_over = num_rows_returned_ - limit_; + row_batch->set_num_rows(row_batch->num_rows() - num_rows_over); + num_rows_returned_ -= num_rows_over; + scan_range_ = NULL; + scanner_->Close(row_batch); + scanner_.reset(); + *eos = true; + } + COUNTER_SET(rows_returned_counter_, num_rows_returned_); + num_owned_io_buffers_.Add(-row_batch->num_io_buffers()); + + if (*eos) StopAndFinalizeCounters(); + return Status::OK(); +} + +void HdfsScanNodeMt::Close(RuntimeState* state) { + if (is_closed()) return; + scanner_.reset(); + scanner_ctx_.reset(); + HdfsScanNodeBase::Close(state); +} + +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node-mt.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h new file mode 100644 index 0000000..7829d47 --- /dev/null +++ b/be/src/exec/hdfs-scan-node-mt.h @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_ +#define IMPALA_EXEC_HDFS_SCAN_NODE_MT_H_ + +#include <boost/scoped_ptr.hpp> + +#include "exec/hdfs-scanner.h" +#include "exec/hdfs-scan-node-base.h" +#include "exec/scanner-context.h" + +namespace impala { + +class DescriptorTbl; +class ObjectPool; +class RuntimeState; +class RowBatch; +class TPlanNode; + +/// Scan node that materializes tuples, evaluates conjuncts and runtime filters +/// in the thread calling GetNext(). Uses the HdfsScanner::GetNext() interface. +class HdfsScanNodeMt : public HdfsScanNodeBase { + public: + HdfsScanNodeMt(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs); + ~HdfsScanNodeMt(); + + virtual Status Prepare(RuntimeState* state); + virtual Status Open(RuntimeState* state); + virtual Status GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos); + virtual void Close(RuntimeState* state); + + virtual bool HasRowBatchQueue() const { return false; } + + private: + /// Current scan range and corresponding scanner. + DiskIoMgr::ScanRange* scan_range_; + boost::scoped_ptr<ScannerContext> scanner_ctx_; + boost::scoped_ptr<HdfsScanner> scanner_; +}; + +} + +#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/faebfebd/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index e0cee4d..83f6452 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -16,70 +16,31 @@ // under the License. #include "exec/hdfs-scan-node.h" -#include "exec/base-sequence-scanner.h" -#include "exec/hdfs-text-scanner.h" -#include "exec/hdfs-lzo-text-scanner.h" -#include "exec/hdfs-sequence-scanner.h" -#include "exec/hdfs-rcfile-scanner.h" -#include "exec/hdfs-avro-scanner.h" -#include "exec/hdfs-parquet-scanner.h" #include <sstream> -#include <avro/errors.h> -#include <avro/schema.h> -#include <boost/algorithm/string.hpp> -#include <boost/filesystem.hpp> -#include <gutil/strings/substitute.h> -#include <hdfs.h> - -#include "codegen/llvm-codegen.h" #include "common/logging.h" -#include "common/object-pool.h" -#include "exprs/expr-context.h" +#include "exec/hdfs-scanner.h" +#include "exec/scanner-context.h" #include "runtime/descriptors.h" -#include "runtime/hdfs-fs-cache.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" -#include "runtime/mem-pool.h" #include "runtime/mem-tracker.h" -#include "runtime/raw-value.h" #include "runtime/row-batch.h" -#include "runtime/string-buffer.h" #include "scheduling/query-resource-mgr.h" -#include "util/bit-util.h" -#include "util/container-util.h" #include "util/debug-util.h" #include "util/disk-info.h" -#include "util/error-util.h" -#include "util/hdfs-util.h" -#include "util/impalad-metrics.h" -#include "util/periodic-counter-updater.h" #include "util/runtime-profile-counters.h" -#include "gen-cpp/PlanNodes_types.h" - #include "common/names.h" DEFINE_int32(max_row_batches, 0, "the maximum size of materialized_row_batches_"); -DEFINE_bool(suppress_unknown_disk_id_warnings, false, - "Suppress unknown disk id warnings generated when the HDFS implementation does not" - " provide volume/disk information."); -DEFINE_int32(runtime_filter_wait_time_ms, 1000, "(Advanced) the maximum time, in ms, " - "that a scan node will wait for expected runtime filters to arrive."); #ifndef NDEBUG DECLARE_bool(skip_file_runtime_filtering); #endif -namespace filesystem = boost::filesystem; using namespace impala; -using namespace llvm; -using namespace strings; -using boost::algorithm::join; - -const string HdfsScanNode::HDFS_SPLIT_STATS_DESC = - "Hdfs split stats (<volume id>:<# splits>/<split lengths>)"; // Amount of memory that we approximate a scanner thread will use not including IoBuffers. // The memory used does not vary considerably between file formats (just a couple of MBs). @@ -91,37 +52,17 @@ const int SCANNER_THREAD_MEM_USAGE = 32 * 1024 * 1024; // estimate scanner thread memory usage. const int COMPRESSED_TEXT_COMPRESSION_RATIO = 11; -// Determines how many unexpected remote bytes trigger an error in the runtime state -const int UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD = 64 * 1024 * 1024; - // Amount of time to block waiting for GetNext() to release scanner threads between // checking if a scanner thread should yield itself back to the global thread pool. const int SCANNER_THREAD_WAIT_TIME_MS = 20; HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs) - : ScanNode(pool, tnode, descs), - runtime_state_(NULL), - skip_header_line_count_(tnode.hdfs_scan_node.__isset.skip_header_line_count ? - tnode.hdfs_scan_node.skip_header_line_count : 0), - tuple_id_(tnode.hdfs_scan_node.tuple_id), - reader_context_(NULL), - tuple_desc_(NULL), - hdfs_table_(NULL), - unknown_disk_id_warned_(false), - initial_ranges_issued_(false), + : HdfsScanNodeBase(pool, tnode, descs), ranges_issued_barrier_(1), scanner_thread_bytes_required_(0), - max_compressed_text_file_length_(NULL), - disks_accessed_bitmap_(TUnit::UNIT, 0), - bytes_read_local_(NULL), - bytes_read_short_circuit_(NULL), - bytes_read_dn_cache_(NULL), - num_remote_ranges_(NULL), - unexpected_remote_bytes_(NULL), done_(false), all_ranges_started_(false), - counters_running_(false), thread_avail_cb_id_(-1), rm_callback_id_(-1), max_num_scanner_threads_(CpuInfo::num_cores()) { @@ -139,99 +80,6 @@ HdfsScanNode::HdfsScanNode(ObjectPool* pool, const TPlanNode& tnode, HdfsScanNode::~HdfsScanNode() { } -Status HdfsScanNode::Init(const TPlanNode& tnode, RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Init(tnode, state)); - - // Add collection item conjuncts - const map<TTupleId, vector<TExpr>>& collection_conjuncts = - tnode.hdfs_scan_node.collection_conjuncts; - map<TTupleId, vector<TExpr>>::const_iterator iter = collection_conjuncts.begin(); - for (; iter != collection_conjuncts.end(); ++iter) { - DCHECK(conjuncts_map_[iter->first].empty()); - RETURN_IF_ERROR( - Expr::CreateExprTrees(pool_, iter->second, &conjuncts_map_[iter->first])); - } - - const TQueryOptions& query_options = state->query_options(); - for (const TRuntimeFilterDesc& filter: tnode.runtime_filters) { - auto it = filter.planid_to_target_ndx.find(tnode.node_id); - DCHECK(it != filter.planid_to_target_ndx.end()); - const TRuntimeFilterTargetDesc& target = filter.targets[it->second]; - if (state->query_options().runtime_filter_mode == TRuntimeFilterMode::LOCAL && - !target.is_local_target) { - continue; - } - if (query_options.disable_row_runtime_filtering && - !target.is_bound_by_partition_columns) { - continue; - } - - FilterContext filter_ctx; - RETURN_IF_ERROR(Expr::CreateExprTree(pool_, target.target_expr, &filter_ctx.expr)); - filter_ctx.filter = state->filter_bank()->RegisterFilter(filter, false); - - string filter_profile_title = Substitute("Filter $0 ($1)", filter.filter_id, - PrettyPrinter::Print(filter_ctx.filter->filter_size(), TUnit::BYTES)); - RuntimeProfile* profile = state->obj_pool()->Add( - new RuntimeProfile(state->obj_pool(), filter_profile_title)); - runtime_profile_->AddChild(profile); - filter_ctx.stats = state->obj_pool()->Add(new FilterStats(profile, - target.is_bound_by_partition_columns)); - - filter_ctxs_.push_back(filter_ctx); - } - - // Add row batch conjuncts - DCHECK(conjuncts_map_[tuple_id_].empty()); - conjuncts_map_[tuple_id_] = conjunct_ctxs_; - - return Status::OK(); -} - -bool HdfsScanNode::FilePassesFilterPredicates(const vector<FilterContext>& filter_ctxs, - const THdfsFileFormat::type& format, HdfsFileDesc* file) { -#ifndef NDEBUG - if (FLAGS_skip_file_runtime_filtering) return true; -#endif - if (filter_ctxs_.size() == 0) return true; - ScanRangeMetadata* metadata = - reinterpret_cast<ScanRangeMetadata*>(file->splits[0]->meta_data()); - if (!PartitionPassesFilters(metadata->partition_id, FilterStats::FILES_KEY, - filter_ctxs)) { - for (int j = 0; j < file->splits.size(); ++j) { - // Mark range as complete to ensure progress. - RangeComplete(format, file->file_compression); - } - return false; - } - return true; -} - -bool HdfsScanNode::WaitForRuntimeFilters(int32_t time_ms) { - vector<string> arrived_filter_ids; - int32_t start = MonotonicMillis(); - for (auto& ctx: filter_ctxs_) { - if (ctx.filter->WaitForArrival(time_ms)) { - arrived_filter_ids.push_back(Substitute("$0", ctx.filter->id())); - } - } - int32_t end = MonotonicMillis(); - const string& wait_time = PrettyPrinter::Print(end - start, TUnit::TIME_MS); - - if (arrived_filter_ids.size() == filter_ctxs_.size()) { - runtime_profile()->AddInfoString("Runtime filters", - Substitute("All filters arrived. Waited $0", wait_time)); - VLOG_QUERY << "Filters arrived. Waited " << wait_time; - return true; - } - - const string& filter_str = Substitute("Only following filters arrived: $0, waited $1", - join(arrived_filter_ids, ", "), wait_time); - runtime_profile()->AddInfoString("Runtime filters", filter_str); - VLOG_QUERY << filter_str; - return false; -} - Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { SCOPED_TIMER(runtime_profile_->total_time_counter()); @@ -241,35 +89,7 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos // so we need to tell them there is work to do. // TODO: This is probably not worth splitting the organisational cost of splitting // initialisation across two places. Move to before the scanner threads start. - initial_ranges_issued_ = true; - - int32 wait_time_ms = FLAGS_runtime_filter_wait_time_ms; - if (state->query_options().runtime_filter_wait_time_ms > 0) { - wait_time_ms = state->query_options().runtime_filter_wait_time_ms; - } - if (filter_ctxs_.size() > 0) WaitForRuntimeFilters(wait_time_ms); - // Apply dynamic partition-pruning per-file. - FileFormatsMap matching_per_type_files; - for (const FileFormatsMap::value_type& v: per_type_files_) { - vector<HdfsFileDesc*>* matching_files = &matching_per_type_files[v.first]; - for (HdfsFileDesc* file: v.second) { - if (FilePassesFilterPredicates(filter_ctxs_, v.first, file)) { - matching_files->push_back(file); - } - } - } - - // Issue initial ranges for all file types. - RETURN_IF_ERROR(HdfsParquetScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::PARQUET])); - RETURN_IF_ERROR(HdfsTextScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::TEXT])); - RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::SEQUENCE_FILE])); - RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::RC_FILE])); - RETURN_IF_ERROR(BaseSequenceScanner::IssueInitialRanges(this, - matching_per_type_files[THdfsFileFormat::AVRO])); + RETURN_IF_ERROR(IssueInitialScanRanges(state)); // Release the scanner threads ranges_issued_barrier_.Notify(); @@ -278,7 +98,11 @@ Status HdfsScanNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos } Status status = GetNextInternal(state, row_batch, eos); - if (!status.ok() || *eos) StopAndFinalizeCounters(); + if (!status.ok() || *eos) { + unique_lock<mutex> l(lock_); + lock_guard<SpinLock> l2(file_type_counts_); + StopAndFinalizeCounters(); + } return status; } @@ -326,283 +150,16 @@ Status HdfsScanNode::GetNextInternal( return status_; } -DiskIoMgr::ScanRange* HdfsScanNode::AllocateScanRange( - hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, - int disk_id, bool try_cache, bool expected_local, int64_t mtime, - const DiskIoMgr::ScanRange* original_split) { - DCHECK_GE(disk_id, -1); - // Require that the scan range is within [0, file_length). While this cannot be used - // to guarantee safety (file_length metadata may be stale), it avoids different - // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking - // beyond the end of the file). - DCHECK_GE(offset, 0); - DCHECK_GE(len, 0); - DCHECK_LE(offset + len, GetFileDesc(file)->file_length) - << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")"; - disk_id = runtime_state_->io_mgr()->AssignQueue(file, disk_id, expected_local); - - ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add( - new ScanRangeMetadata(partition_id, original_split)); - DiskIoMgr::ScanRange* range = - runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange()); - range->Reset(fs, file, len, offset, disk_id, try_cache, expected_local, - mtime, metadata); - return range; -} - -HdfsFileDesc* HdfsScanNode::GetFileDesc(const string& filename) { - DCHECK(file_descs_.find(filename) != file_descs_.end()); - return file_descs_[filename]; -} - -void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) { - unique_lock<mutex> l(metadata_lock_); - DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end()); - per_file_metadata_[filename] = metadata; -} - -void* HdfsScanNode::GetFileMetadata(const string& filename) { - unique_lock<mutex> l(metadata_lock_); - map<string, void*>::iterator it = per_file_metadata_.find(filename); - if (it == per_file_metadata_.end()) return NULL; - return it->second; -} - -void* HdfsScanNode::GetCodegenFn(THdfsFileFormat::type type) { - CodegendFnMap::iterator it = codegend_fn_map_.find(type); - if (it == codegend_fn_map_.end()) return NULL; - return it->second; -} - -Status HdfsScanNode::CreateAndOpenScanner(HdfsPartitionDescriptor* partition, - ScannerContext* context, scoped_ptr<HdfsScanner>* scanner) { - DCHECK(context != NULL); - THdfsCompression::type compression = - context->GetStream()->file_desc()->file_compression; - - // Create a new scanner for this file format and compression. - switch (partition->file_format()) { - case THdfsFileFormat::TEXT: - // Lzo-compressed text files are scanned by a scanner that it is implemented as a - // dynamic library, so that Impala does not include GPL code. - if (compression == THdfsCompression::LZO) { - scanner->reset(HdfsLzoTextScanner::GetHdfsLzoTextScanner(this, runtime_state_)); - } else { - scanner->reset(new HdfsTextScanner(this, runtime_state_, true)); - } - break; - case THdfsFileFormat::SEQUENCE_FILE: - scanner->reset(new HdfsSequenceScanner(this, runtime_state_, true)); - break; - case THdfsFileFormat::RC_FILE: - scanner->reset(new HdfsRCFileScanner(this, runtime_state_, true)); - break; - case THdfsFileFormat::AVRO: - scanner->reset(new HdfsAvroScanner(this, runtime_state_, true)); - break; - case THdfsFileFormat::PARQUET: - scanner->reset(new HdfsParquetScanner(this, runtime_state_, true)); - break; - default: - return Status(Substitute("Unknown Hdfs file format type: $0", - partition->file_format())); - } - DCHECK(scanner->get() != NULL); - Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_); - if (status.ok()) { - status = scanner->get()->Open(context); - if (!status.ok()) scanner->get()->Close(scanner->get()->batch()); - } else { - context->ClearStreams(); - } - return status; -} - -Tuple* HdfsScanNode::InitTemplateTuple(RuntimeState* state, - const vector<ExprContext*>& value_ctxs) { - if (partition_key_slots_.empty()) return NULL; - - // Lock to protect access to partition_key_pool_ and value_ctxs - // TODO: we can push the lock to the mempool and exprs_values should not - // use internal memory. - Tuple* template_tuple = InitEmptyTemplateTuple(*tuple_desc_); - - unique_lock<mutex> l(lock_); - for (int i = 0; i < partition_key_slots_.size(); ++i) { - const SlotDescriptor* slot_desc = partition_key_slots_[i]; - // Exprs guaranteed to be literals, so can safely be evaluated without a row context - void* value = value_ctxs[slot_desc->col_pos()]->GetValue(NULL); - RawValue::Write(value, template_tuple, slot_desc, NULL); - } - return template_tuple; -} - -Tuple* HdfsScanNode::InitEmptyTemplateTuple(const TupleDescriptor& tuple_desc) { - Tuple* template_tuple = NULL; - { - unique_lock<mutex> l(lock_); - template_tuple = Tuple::Create(tuple_desc.byte_size(), scan_node_pool_.get()); - } - memset(template_tuple, 0, tuple_desc.byte_size()); - return template_tuple; -} - -void HdfsScanNode::TransferToScanNodePool(MemPool* pool) { - unique_lock<mutex> l(lock_); - scan_node_pool_->AcquireData(pool, false); -} - -Status HdfsScanNode::TriggerDebugAction() { - return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_); -} - Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); - runtime_state_ = state; - RETURN_IF_ERROR(ScanNode::Prepare(state)); - - tuple_desc_ = state->desc_tbl().GetTupleDescriptor(tuple_id_); - DCHECK(tuple_desc_ != NULL); - - // Prepare collection conjuncts - ConjunctsMap::const_iterator iter = conjuncts_map_.begin(); - for (; iter != conjuncts_map_.end(); ++iter) { - TupleDescriptor* tuple_desc = state->desc_tbl().GetTupleDescriptor(iter->first); - - // conjuncts_ are already prepared in ExecNode::Prepare(), don't try to prepare again - if (tuple_desc == tuple_desc_) continue; - - RowDescriptor* collection_row_desc = - state->obj_pool()->Add(new RowDescriptor(tuple_desc, /* is_nullable */ false)); - RETURN_IF_ERROR( - Expr::Prepare(iter->second, state, *collection_row_desc, expr_mem_tracker())); - } + RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); + // Assign scanner thread group to cgroup, if any. if (!state->cgroup().empty()) { scanner_threads_.SetCgroupsMgr(state->exec_env()->cgroups_mgr()); scanner_threads_.SetCgroup(state->cgroup()); } - // One-time initialisation of state that is constant across scan ranges - DCHECK(tuple_desc_->table_desc() != NULL); - hdfs_table_ = static_cast<const HdfsTableDescriptor*>(tuple_desc_->table_desc()); - scan_node_pool_.reset(new MemPool(mem_tracker())); - - for (int i = 0; i < filter_ctxs_.size(); ++i) { - RETURN_IF_ERROR( - filter_ctxs_[i].expr->Prepare(state, row_desc(), expr_mem_tracker())); - AddExprCtxToFree(filter_ctxs_[i].expr); - } - - // Parse Avro table schema if applicable - const string& avro_schema_str = hdfs_table_->avro_schema(); - if (!avro_schema_str.empty()) { - avro_schema_t avro_schema; - int error = avro_schema_from_json_length( - avro_schema_str.c_str(), avro_schema_str.size(), &avro_schema); - if (error != 0) { - return Status(Substitute("Failed to parse table schema: $0", avro_strerror())); - } - RETURN_IF_ERROR(AvroSchemaElement::ConvertSchema(avro_schema, avro_schema_.get())); - } - - // Gather materialized partition-key slots and non-partition slots. - const vector<SlotDescriptor*>& slots = tuple_desc_->slots(); - for (size_t i = 0; i < slots.size(); ++i) { - if (hdfs_table_->IsClusteringCol(slots[i])) { - partition_key_slots_.push_back(slots[i]); - } else { - materialized_slots_.push_back(slots[i]); - } - } - - // Order the materialized slots such that for schemaless file formats (e.g. text) the - // order corresponds to the physical order in files. For formats where the file schema - // is independent of the table schema (e.g. Avro, Parquet), this step is not necessary. - sort(materialized_slots_.begin(), materialized_slots_.end(), - SlotDescriptor::ColPathLessThan); - - // Populate mapping from slot path to index into materialized_slots_. - for (int i = 0; i < materialized_slots_.size(); ++i) { - path_to_materialized_slot_idx_[materialized_slots_[i]->col_path()] = i; - } - - // Initialize is_materialized_col_ - is_materialized_col_.resize(hdfs_table_->num_cols()); - for (int i = 0; i < hdfs_table_->num_cols(); ++i) { - is_materialized_col_[i] = GetMaterializedSlotIdx(vector<int>(1, i)) != SKIP_COLUMN; - } - - HdfsFsCache::HdfsFsMap fs_cache; - // Convert the TScanRangeParams into per-file DiskIO::ScanRange objects and populate - // partition_ids_, file_descs_, and per_type_files_. - DCHECK(scan_range_params_ != NULL) - << "Must call SetScanRanges() before calling Prepare()"; - int num_ranges_missing_volume_id = 0; - for (int i = 0; i < scan_range_params_->size(); ++i) { - DCHECK((*scan_range_params_)[i].scan_range.__isset.hdfs_file_split); - const THdfsFileSplit& split = (*scan_range_params_)[i].scan_range.hdfs_file_split; - partition_ids_.insert(split.partition_id); - HdfsPartitionDescriptor* partition_desc = - hdfs_table_->GetPartition(split.partition_id); - if (partition_desc == NULL) { - // TODO: this should be a DCHECK but we sometimes hit it. It's likely IMPALA-1702. - LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id() - << " partition_id=" << split.partition_id - << "\n" << PrintThrift(state->fragment_params()); - return Status("Query encountered invalid metadata, likely due to IMPALA-1702." - " Try rerunning the query."); - } - - if (partition_template_tuple_map_.find(split.partition_id) == - partition_template_tuple_map_.end()) { - partition_template_tuple_map_[split.partition_id] = - InitTemplateTuple(state, partition_desc->partition_key_value_ctxs());; - } - - filesystem::path file_path(partition_desc->location()); - file_path.append(split.file_name, filesystem::path::codecvt()); - const string& native_file_path = file_path.native(); - - HdfsFileDesc* file_desc = NULL; - FileDescMap::iterator file_desc_it = file_descs_.find(native_file_path); - if (file_desc_it == file_descs_.end()) { - // Add new file_desc to file_descs_ and per_type_files_ - file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path)); - file_descs_[native_file_path] = file_desc; - file_desc->file_length = split.file_length; - file_desc->mtime = split.mtime; - file_desc->file_compression = split.file_compression; - RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( - native_file_path, &file_desc->fs, &fs_cache)); - num_unqueued_files_.Add(1); - per_type_files_[partition_desc->file_format()].push_back(file_desc); - } else { - // File already processed - file_desc = file_desc_it->second; - } - - bool expected_local = - (*scan_range_params_)[i].__isset.is_remote && !(*scan_range_params_)[i].is_remote; - if (expected_local && (*scan_range_params_)[i].volume_id == -1) { - if (!FLAGS_suppress_unknown_disk_id_warnings && !unknown_disk_id_warned_) { - runtime_profile()->AppendExecOption("Missing Volume Id"); - runtime_state()->LogError(ErrorMsg(TErrorCode::HDFS_SCAN_NODE_UNKNOWN_DISK)); - unknown_disk_id_warned_ = true; - } - ++num_ranges_missing_volume_id; - } - - bool try_cache = (*scan_range_params_)[i].is_cached; - if (runtime_state_->query_options().disable_cached_reads) { - DCHECK(!try_cache) << "Params should not have had this set."; - } - file_desc->splits.push_back( - AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length, - split.offset, split.partition_id, (*scan_range_params_)[i].volume_id, - try_cache, expected_local, file_desc->mtime)); - } - // Compute the minimum bytes required to start a new thread. This is based on the // file format. // The higher the estimate, the less likely it is the query will fail but more likely @@ -632,100 +189,17 @@ Status HdfsScanNode::Prepare(RuntimeState* state) { } } scanner_thread_bytes_required_ += scanner_thread_mem_usage; - - // Prepare all the partitions scanned by the scan node - for (int64_t partition_id: partition_ids_) { - HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); - // This is IMPALA-1702, but will have been caught earlier in this method. - DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id() - << " partition_id=" << partition_id - << "\n" << PrintThrift(state->fragment_params()); - RETURN_IF_ERROR(partition_desc->PrepareExprs(state)); - } - - // Update server wide metrics for number of scan ranges and ranges that have - // incomplete metadata. - ImpaladMetrics::NUM_RANGES_PROCESSED->Increment(scan_range_params_->size()); - ImpaladMetrics::NUM_RANGES_MISSING_VOLUME_ID->Increment(num_ranges_missing_volume_id); - - // Add per volume stats to the runtime profile - PerVolumnStats per_volume_stats; - stringstream str; - UpdateHdfsSplitStats(*scan_range_params_, &per_volume_stats); - PrintHdfsSplitStats(per_volume_stats, &str); - runtime_profile()->AddInfoString(HDFS_SPLIT_STATS_DESC, str.str()); - - // Create codegen'd functions - for (int format = THdfsFileFormat::TEXT; - format <= THdfsFileFormat::PARQUET; ++format) { - vector<HdfsFileDesc*>& file_descs = - per_type_files_[static_cast<THdfsFileFormat::type>(format)]; - - if (file_descs.empty()) continue; - - // Randomize the order this node processes the files. We want to do this to avoid - // issuing remote reads to the same DN from different impalads. In file formats such - // as avro/seq/rc (i.e. splittable with a header), every node first reads the header. - // If every node goes through the files in the same order, all the remote reads are - // for the same file meaning a few DN serves a lot of remote reads at the same time. - random_shuffle(file_descs.begin(), file_descs.end()); - - // Create reusable codegen'd functions for each file type type needed - // TODO: do this for conjuncts_map_ - Function* fn; - Status status; - switch (format) { - case THdfsFileFormat::TEXT: - status = HdfsTextScanner::Codegen(this, conjunct_ctxs_, &fn); - break; - case THdfsFileFormat::SEQUENCE_FILE: - status = HdfsSequenceScanner::Codegen(this, conjunct_ctxs_, &fn); - break; - case THdfsFileFormat::AVRO: - status = HdfsAvroScanner::Codegen(this, conjunct_ctxs_, &fn); - break; - case THdfsFileFormat::PARQUET: - status = HdfsParquetScanner::Codegen(this, conjunct_ctxs_, &fn); - break; - default: - // No codegen for this format - fn = NULL; - status = Status("Not implemented for this format."); - } - DCHECK(fn != NULL || !status.ok()); - - const char* format_name = _THdfsFileFormat_VALUES_TO_NAMES.find(format)->second; - if (!status.ok()) { - runtime_profile()->AddCodegenMsg(false, status, format_name); - } else { - runtime_profile()->AddCodegenMsg(true, status, format_name); - LlvmCodeGen* codegen; - RETURN_IF_ERROR(runtime_state_->GetCodegen(&codegen)); - codegen->AddFunctionToJit( - fn, &codegend_fn_map_[static_cast<THdfsFileFormat::type>(format)]); - } - } - return Status::OK(); } -// This function initiates the connection to hdfs and starts up the initial scanner -// threads. The scanner subclasses are passed the initial splits. Scanners are expected to -// queue up a non-zero number of those splits to the io mgr (via the ScanNode). Scan -// ranges are not issued until the first GetNext() call; scanner threads will block on -// ranges_issued_barrier_ until ranges are issued. +// This function registers the ThreadTokenAvailableCb to start up the initial scanner +// threads. Scan ranges are not issued until the first GetNext() call; scanner threads +// will block on ranges_issued_barrier_ until ranges are issued. Status HdfsScanNode::Open(RuntimeState* state) { - RETURN_IF_ERROR(ExecNode::Open(state)); - - // Open collection conjuncts - ConjunctsMap::const_iterator iter = conjuncts_map_.begin(); - for (; iter != conjuncts_map_.end(); ++iter) { - // conjuncts_ are already opened in ExecNode::Open() - if (iter->first == tuple_id_) continue; - RETURN_IF_ERROR(Expr::Open(iter->second, state)); - } + SCOPED_TIMER(runtime_profile_->total_time_counter()); + RETURN_IF_ERROR(HdfsScanNodeBase::Open(state)); - for (auto& filter_ctx: filter_ctxs_) RETURN_IF_ERROR(filter_ctx.expr->Open(state)); + if (file_descs_.empty() || progress_.done()) return Status::OK(); // We need at least one scanner thread to make progress. We need to make this // reservation before any ranges are issued. @@ -744,95 +218,9 @@ Status HdfsScanNode::Open(RuntimeState* state) { runtime_state_->resource_pool())); } - if (file_descs_.empty()) { - SetDone(); - return Status::OK(); - } - - // Open all the partition exprs used by the scan node - for (int64_t partition_id: partition_ids_) { - HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); - DCHECK(partition_desc != NULL) << "table_id=" << hdfs_table_->id() - << " partition_id=" << partition_id - << "\n" << PrintThrift(state->fragment_params()); - RETURN_IF_ERROR(partition_desc->OpenExprs(state)); - } - - RETURN_IF_ERROR(runtime_state_->io_mgr()->RegisterContext( - &reader_context_, mem_tracker())); - - // Initialize HdfsScanNode specific counters - read_timer_ = ADD_TIMER(runtime_profile(), TOTAL_HDFS_READ_TIMER); - per_read_thread_throughput_counter_ = runtime_profile()->AddDerivedCounter( - PER_READ_THREAD_THROUGHPUT_COUNTER, TUnit::BYTES_PER_SECOND, - bind<int64_t>(&RuntimeProfile::UnitsPerSecond, bytes_read_counter_, read_timer_)); - scan_ranges_complete_counter_ = - ADD_COUNTER(runtime_profile(), SCAN_RANGES_COMPLETE_COUNTER, TUnit::UNIT); - if (DiskInfo::num_disks() < 64) { - num_disks_accessed_counter_ = - ADD_COUNTER(runtime_profile(), NUM_DISKS_ACCESSED_COUNTER, TUnit::UNIT); - } else { - num_disks_accessed_counter_ = NULL; - } - num_scanner_threads_started_counter_ = - ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT); - - runtime_state_->io_mgr()->set_bytes_read_counter(reader_context_, bytes_read_counter()); - runtime_state_->io_mgr()->set_read_timer(reader_context_, read_timer()); - runtime_state_->io_mgr()->set_active_read_thread_counter(reader_context_, - &active_hdfs_read_thread_counter_); - runtime_state_->io_mgr()->set_disks_access_bitmap(reader_context_, - &disks_accessed_bitmap_); - - average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter( - AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_); - average_hdfs_read_thread_concurrency_ = runtime_profile()->AddSamplingCounter( - AVERAGE_HDFS_READ_THREAD_CONCURRENCY, &active_hdfs_read_thread_counter_); - - bytes_read_local_ = ADD_COUNTER(runtime_profile(), "BytesReadLocal", - TUnit::BYTES); - bytes_read_short_circuit_ = ADD_COUNTER(runtime_profile(), "BytesReadShortCircuit", - TUnit::BYTES); - bytes_read_dn_cache_ = ADD_COUNTER(runtime_profile(), "BytesReadDataNodeCache", - TUnit::BYTES); - num_remote_ranges_ = ADD_COUNTER(runtime_profile(), "RemoteScanRanges", - TUnit::UNIT); - unexpected_remote_bytes_ = ADD_COUNTER(runtime_profile(), "BytesReadRemoteUnexpected", - TUnit::BYTES); - - max_compressed_text_file_length_ = runtime_profile()->AddHighWaterMarkCounter( - "MaxCompressedTextFileLength", TUnit::BYTES); - - for (int i = 0; i < state->io_mgr()->num_total_disks() + 1; ++i) { - hdfs_read_thread_concurrency_bucket_.push_back( - pool_->Add(new RuntimeProfile::Counter(TUnit::DOUBLE_VALUE, 0))); - } - runtime_profile()->RegisterBucketingCounters(&active_hdfs_read_thread_counter_, - &hdfs_read_thread_concurrency_bucket_); - - counters_running_ = true; - - int total_splits = 0; - for (FileDescMap::iterator it = file_descs_.begin(); it != file_descs_.end(); ++it) { - total_splits += it->second->splits.size(); - } - - if (total_splits == 0) { - SetDone(); - return Status::OK(); - } - - stringstream ss; - ss << "Splits complete (node=" << id() << "):"; - progress_.Init(ss.str(), total_splits); return Status::OK(); } -Status HdfsScanNode::Reset(RuntimeState* state) { - DCHECK(false) << "Internal error: Scan nodes should not appear in subplans."; - return Status("Internal error: Scan nodes should not appear in subplans."); -} - void HdfsScanNode::Close(RuntimeState* state) { if (is_closed()) return; SetDone(); @@ -849,47 +237,36 @@ void HdfsScanNode::Close(RuntimeState* state) { num_owned_io_buffers_.Add(-materialized_row_batches_->Cleanup()); DCHECK_EQ(num_owned_io_buffers_.Load(), 0) << "ScanNode has leaked io buffers"; - if (reader_context_ != NULL) { - // There may still be io buffers used by parent nodes so we can't unregister the - // reader context yet. The runtime state keeps a list of all the reader contexts and - // they are unregistered when the fragment is closed. - state->reader_contexts()->push_back(reader_context_); - // Need to wait for all the active scanner threads to finish to ensure there is no - // more memory tracked by this scan node's mem tracker. - state->io_mgr()->CancelContext(reader_context_, true); - } - - StopAndFinalizeCounters(); + HdfsScanNodeBase::Close(state); +} - // There should be no active scanner threads and hdfs read threads. - DCHECK_EQ(active_scanner_thread_counter_.value(), 0); - DCHECK_EQ(active_hdfs_read_thread_counter_.value(), 0); +void HdfsScanNode::SetFileMetadata(const string& filename, void* metadata) { + unique_lock<mutex> l(metadata_lock_); + DCHECK(per_file_metadata_.find(filename) == per_file_metadata_.end()); + per_file_metadata_[filename] = metadata; +} - if (scan_node_pool_.get() != NULL) scan_node_pool_->FreeAll(); +void* HdfsScanNode::GetFileMetadata(const string& filename) { + unique_lock<mutex> l(metadata_lock_); + map<string, void*>::iterator it = per_file_metadata_.find(filename); + if (it == per_file_metadata_.end()) return NULL; + return it->second; +} - // Close all the partitions scanned by the scan node - for (int64_t partition_id: partition_ids_) { - HdfsPartitionDescriptor* partition_desc = hdfs_table_->GetPartition(partition_id); - if (partition_desc == NULL) { - // TODO: Revert when IMPALA-1702 is fixed. - LOG(ERROR) << "Bad table descriptor! table_id=" << hdfs_table_->id() - << " partition_id=" << partition_id - << "\n" << PrintThrift(state->fragment_params()); - continue; - } - partition_desc->CloseExprs(state); - } +void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, + const std::vector<THdfsCompression::type>& compression_type) { + lock_guard<SpinLock> l(file_type_counts_); + HdfsScanNodeBase::RangeComplete(file_type, compression_type); +} - // Close collection conjuncts - ConjunctsMap::const_iterator iter = conjuncts_map_.begin(); - for (; iter != conjuncts_map_.end(); ++iter) { - // conjuncts_ are already closed in ExecNode::Close() - if (iter->first == tuple_id_) continue; - Expr::Close(iter->second, state); - } +void HdfsScanNode::TransferToScanNodePool(MemPool* pool) { + unique_lock<mutex> l(lock_); + scan_node_pool_->AcquireData(pool, false); +} - for (auto& filter_ctx: filter_ctxs_) filter_ctx.expr->Close(state); - ScanNode::Close(state); +void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) { + InitNullCollectionValues(row_batch); + materialized_row_batches_->AddBatch(row_batch); } Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges, @@ -902,43 +279,6 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges return Status::OK(); } -void HdfsScanNode::AddMaterializedRowBatch(RowBatch* row_batch) { - InitNullCollectionValues(row_batch); - materialized_row_batches_->AddBatch(row_batch); -} - -void HdfsScanNode::InitNullCollectionValues(const TupleDescriptor* tuple_desc, - Tuple* tuple) const { - for (const SlotDescriptor* slot_desc: tuple_desc->collection_slots()) { - CollectionValue* slot = reinterpret_cast<CollectionValue*>( - tuple->GetSlot(slot_desc->tuple_offset())); - if (tuple->IsNull(slot_desc->null_indicator_offset())) { - *slot = CollectionValue(); - continue; - } - // Recursively traverse collection items. - const TupleDescriptor* item_desc = slot_desc->collection_item_descriptor(); - if (item_desc->collection_slots().empty()) continue; - for (int i = 0; i < slot->num_tuples; ++i) { - int item_offset = i * item_desc->byte_size(); - Tuple* collection_item = reinterpret_cast<Tuple*>(slot->ptr + item_offset); - InitNullCollectionValues(item_desc, collection_item); - } - } -} - -void HdfsScanNode::InitNullCollectionValues(RowBatch* row_batch) const { - DCHECK_EQ(row_batch->row_desc().tuple_descriptors().size(), 1); - const TupleDescriptor& tuple_desc = - *row_batch->row_desc().tuple_descriptors()[tuple_idx()]; - if (tuple_desc.collection_slots().empty()) return; - for (int i = 0; i < row_batch->num_rows(); ++i) { - Tuple* tuple = row_batch->GetRow(i)->GetTuple(tuple_idx()); - DCHECK(tuple != NULL); - InitNullCollectionValues(&tuple_desc, tuple); - } -} - // For controlling the amount of memory used for scanners, we approximate the // scanner mem usage based on scanner_thread_bytes_required_, rather than the // consumption in the scan node's mem tracker. The problem with the scan node @@ -1161,33 +501,6 @@ void HdfsScanNode::ScannerThread() { runtime_state_->resource_pool()->ReleaseThreadToken(false); } -bool HdfsScanNode::PartitionPassesFilters(int32_t partition_id, - const string& stats_name, const vector<FilterContext>& filter_ctxs) { - if (filter_ctxs.size() == 0) return true; - DCHECK_EQ(filter_ctxs.size(), filter_ctxs_.size()) - << "Mismatched number of filter contexts"; - Tuple* template_tuple = partition_template_tuple_map_[partition_id]; - // Defensive - if template_tuple is NULL, there can be no filters on partition columns. - if (template_tuple == NULL) return true; - TupleRow* tuple_row_mem = reinterpret_cast<TupleRow*>(&template_tuple); - for (const FilterContext& ctx: filter_ctxs) { - int target_ndx = ctx.filter->filter_desc().planid_to_target_ndx.at(id_); - if (!ctx.filter->filter_desc().targets[target_ndx].is_bound_by_partition_columns) { - continue; - } - void* e = ctx.expr->GetValue(tuple_row_mem); - - // Not quite right because bitmap could arrive after Eval(), but we're ok with - // off-by-one errors. - bool processed = ctx.filter->HasBloomFilter(); - bool passed_filter = ctx.filter->Eval<void>(e, ctx.expr->root()->type()); - ctx.stats->IncrCounters(stats_name, 1, processed, !passed_filter); - if (!passed_filter) return false; - } - - return true; -} - namespace { // Returns true if 'format' uses a scanner derived from BaseSequenceScanner. Used to @@ -1205,8 +518,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, DCHECK(scan_range != NULL); - ScanRangeMetadata* metadata = - reinterpret_cast<ScanRangeMetadata*>(scan_range->meta_data()); + ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data()); int64_t partition_id = metadata->partition_id; HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); DCHECK(partition != NULL) << "table_id=" << hdfs_table_->id() @@ -1269,26 +581,6 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, return status; } -void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, - const THdfsCompression::type& compression_type) { - vector<THdfsCompression::type> types; - types.push_back(compression_type); - RangeComplete(file_type, types); -} - -void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, - const vector<THdfsCompression::type>& compression_types) { - scan_ranges_complete_counter()->Add(1); - progress_.Update(1); - - { - lock_guard<SpinLock> l(file_type_counts_lock_); - for (int i = 0; i < compression_types.size(); ++i) { - ++file_type_counts_[make_pair(file_type, compression_types[i])]; - } - } -} - void HdfsScanNode::SetDone() { { unique_lock<mutex> l(lock_); @@ -1300,129 +592,3 @@ void HdfsScanNode::SetDone() { } materialized_row_batches_->Shutdown(); } - -void HdfsScanNode::ComputeSlotMaterializationOrder(vector<int>* order) const { - const vector<ExprContext*>& conjuncts = ExecNode::conjunct_ctxs(); - // Initialize all order to be conjuncts.size() (after the last conjunct) - order->insert(order->begin(), materialized_slots().size(), conjuncts.size()); - - const DescriptorTbl& desc_tbl = runtime_state_->desc_tbl(); - - vector<SlotId> slot_ids; - for (int conjunct_idx = 0; conjunct_idx < conjuncts.size(); ++conjunct_idx) { - slot_ids.clear(); - int num_slots = conjuncts[conjunct_idx]->root()->GetSlotIds(&slot_ids); - for (int j = 0; j < num_slots; ++j) { - SlotDescriptor* slot_desc = desc_tbl.GetSlotDescriptor(slot_ids[j]); - int slot_idx = GetMaterializedSlotIdx(slot_desc->col_path()); - // slot_idx == -1 means this was a partition key slot which is always - // materialized before any slots. - if (slot_idx == -1) continue; - // If this slot hasn't been assigned an order, assign it be materialized - // before evaluating conjuncts[i] - if ((*order)[slot_idx] == conjuncts.size()) { - (*order)[slot_idx] = conjunct_idx; - } - } - } -} - -void HdfsScanNode::StopAndFinalizeCounters() { - unique_lock<mutex> l(lock_); - if (!counters_running_) return; - counters_running_ = false; - - PeriodicCounterUpdater::StopTimeSeriesCounter(bytes_read_timeseries_counter_); - PeriodicCounterUpdater::StopRateCounter(total_throughput_counter()); - PeriodicCounterUpdater::StopSamplingCounter(average_scanner_thread_concurrency_); - PeriodicCounterUpdater::StopSamplingCounter(average_hdfs_read_thread_concurrency_); - PeriodicCounterUpdater::StopBucketingCounters(&hdfs_read_thread_concurrency_bucket_, - true); - - // Output hdfs read thread concurrency into info string - stringstream ss; - for (int i = 0; i < hdfs_read_thread_concurrency_bucket_.size(); ++i) { - ss << i << ":" << setprecision(4) - << hdfs_read_thread_concurrency_bucket_[i]->double_value() << "% "; - } - runtime_profile_->AddInfoString("Hdfs Read Thread Concurrency Bucket", ss.str()); - - // Convert disk access bitmap to num of disk accessed - uint64_t num_disk_bitmap = disks_accessed_bitmap_.value(); - int64_t num_disk_accessed = BitUtil::Popcount(num_disk_bitmap); - if (num_disks_accessed_counter_ != NULL) { - num_disks_accessed_counter_->Set(num_disk_accessed); - } - - // output completed file types and counts to info string - if (!file_type_counts_.empty()) { - stringstream ss; - { - lock_guard<SpinLock> l2(file_type_counts_lock_); - for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin(); - it != file_type_counts_.end(); ++it) { - ss << it->first.first << "/" << it->first.second << ":" << it->second << " "; - } - } - runtime_profile_->AddInfoString("File Formats", ss.str()); - } - - // Output fraction of scanners with codegen enabled - int num_enabled = num_scanners_codegen_enabled_.Load(); - int total = num_enabled + num_scanners_codegen_disabled_.Load(); - runtime_profile()->AppendExecOption( - Substitute("Codegen enabled: $0 out of $1", num_enabled, total)); - - if (reader_context_ != NULL) { - bytes_read_local_->Set(runtime_state_->io_mgr()->bytes_read_local(reader_context_)); - bytes_read_short_circuit_->Set( - runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_)); - bytes_read_dn_cache_->Set( - runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_)); - num_remote_ranges_->Set(static_cast<int64_t>( - runtime_state_->io_mgr()->num_remote_ranges(reader_context_))); - unexpected_remote_bytes_->Set( - runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_)); - - if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) { - runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute( - "Read $0 of data across network that was expected to be local. " - "Block locality metadata for table '$1.$2' may be stale. Consider running " - "\"INVALIDATE METADATA `$1`.`$2`\".", - PrettyPrinter::Print(unexpected_remote_bytes_->value(), TUnit::BYTES), - hdfs_table_->database(), hdfs_table_->name()))); - } - - ImpaladMetrics::IO_MGR_BYTES_READ->Increment(bytes_read_counter()->value()); - ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ->Increment( - bytes_read_local_->value()); - ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ->Increment( - bytes_read_short_circuit_->value()); - ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment( - bytes_read_dn_cache_->value()); - } -} - -void HdfsScanNode::UpdateHdfsSplitStats( - const vector<TScanRangeParams>& scan_range_params_list, - PerVolumnStats* per_volume_stats) { - pair<int, int64_t> init_value(0, 0); - for (const TScanRangeParams& scan_range_params: scan_range_params_list) { - const TScanRange& scan_range = scan_range_params.scan_range; - if (!scan_range.__isset.hdfs_file_split) continue; - const THdfsFileSplit& split = scan_range.hdfs_file_split; - pair<int, int64_t>* stats = - FindOrInsert(per_volume_stats, scan_range_params.volume_id, init_value); - ++(stats->first); - stats->second += split.length; - } -} - -void HdfsScanNode::PrintHdfsSplitStats(const PerVolumnStats& per_volume_stats, - stringstream* ss) { - for (PerVolumnStats::const_iterator i = per_volume_stats.begin(); - i != per_volume_stats.end(); ++i) { - (*ss) << i->first << ":" << i->second.first << "/" - << PrettyPrinter::Print(i->second.second, TUnit::BYTES) << " "; - } -}
