IMPALA-4835 (prep only): create io subfolder and namespace Instead of using the DiskIoMgr class as a namespace, which prevents forward-declaration of inner classes, create an impala::io namespace and unnested the inner class.
This is done in anticipation of DiskIoMgr depending on BufferPool. This helps avoid a circular dependency between DiskIoMgr, TmpFileMgr and BufferPool headers that could not be broken with forward declarations. Testing: Ran core tests. Change-Id: If807f93a47d8027a43e56dd80b1b535d0bb74e1b Reviewed-on: http://gerrit.cloudera.org:8080/8424 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/b840137c Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/b840137c Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/b840137c Branch: refs/heads/master Commit: b840137c940d71af5cec2daf482b523a38b6a9f1 Parents: 2510fe0 Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Mon Oct 30 16:34:47 2017 -0700 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Fri Nov 17 22:47:34 2017 +0000 ---------------------------------------------------------------------- be/CMakeLists.txt | 2 + be/src/exec/base-sequence-scanner.cc | 9 +- be/src/exec/hdfs-parquet-scanner.cc | 36 +- be/src/exec/hdfs-parquet-scanner.h | 4 +- be/src/exec/hdfs-scan-node-base.cc | 25 +- be/src/exec/hdfs-scan-node-base.h | 26 +- be/src/exec/hdfs-scan-node-mt.h | 2 +- be/src/exec/hdfs-scan-node.cc | 7 +- be/src/exec/hdfs-scan-node.h | 6 +- be/src/exec/hdfs-text-scanner.cc | 9 +- be/src/exec/kudu-scan-node.cc | 2 +- be/src/exec/scanner-context.cc | 12 +- be/src/exec/scanner-context.h | 14 +- be/src/runtime/CMakeLists.txt | 10 +- be/src/runtime/disk-io-mgr-handle-cache.h | 196 --- .../runtime/disk-io-mgr-handle-cache.inline.h | 231 ---- be/src/runtime/disk-io-mgr-internal.h | 76 -- be/src/runtime/disk-io-mgr-reader-context.cc | 292 ----- be/src/runtime/disk-io-mgr-reader-context.h | 406 ------ be/src/runtime/disk-io-mgr-scan-range.cc | 591 --------- be/src/runtime/disk-io-mgr-stress-test.cc | 60 - be/src/runtime/disk-io-mgr-stress.cc | 246 ---- be/src/runtime/disk-io-mgr-stress.h | 94 -- be/src/runtime/disk-io-mgr-test.cc | 1127 ----------------- be/src/runtime/disk-io-mgr.cc | 1190 ----------------- be/src/runtime/disk-io-mgr.h | 972 -------------- be/src/runtime/exec-env.cc | 4 +- be/src/runtime/exec-env.h | 9 +- be/src/runtime/io/CMakeLists.txt | 36 + be/src/runtime/io/disk-io-mgr-internal.h | 78 ++ be/src/runtime/io/disk-io-mgr-stress-test.cc | 61 + be/src/runtime/io/disk-io-mgr-stress.cc | 247 ++++ be/src/runtime/io/disk-io-mgr-stress.h | 95 ++ be/src/runtime/io/disk-io-mgr-test.cc | 1129 +++++++++++++++++ be/src/runtime/io/disk-io-mgr.cc | 1191 ++++++++++++++++++ be/src/runtime/io/disk-io-mgr.h | 550 ++++++++ be/src/runtime/io/handle-cache.h | 197 +++ be/src/runtime/io/handle-cache.inline.h | 232 ++++ be/src/runtime/io/request-context.cc | 293 +++++ be/src/runtime/io/request-context.h | 403 ++++++ be/src/runtime/io/request-ranges.h | 471 +++++++ be/src/runtime/io/scan-range.cc | 593 +++++++++ be/src/runtime/row-batch.h | 2 +- be/src/runtime/runtime-state.cc | 2 +- be/src/runtime/runtime-state.h | 7 +- be/src/runtime/test-env.h | 2 +- be/src/runtime/tmp-file-mgr-test.cc | 10 +- be/src/runtime/tmp-file-mgr.cc | 20 +- be/src/runtime/tmp-file-mgr.h | 20 +- 49 files changed, 5702 insertions(+), 5595 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index bf7aa26..163567a 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -361,6 +361,7 @@ set (IMPALA_LINK_LIBS GlobalFlags histogram_proto ImpalaThrift + Io kudu_util krpc Rpc @@ -386,6 +387,7 @@ set (IMPALA_LINK_LIBS if (BUILD_SHARED_LIBS) set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS} BufferPool + Io Runtime Exec CodeGen http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index fcf58c6..7f20e31 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -32,6 +32,7 @@ #include "common/names.h" using namespace impala; +using namespace impala::io; const int BaseSequenceScanner::HEADER_SIZE = 1024; const int BaseSequenceScanner::SYNC_MARKER = -1; @@ -48,7 +49,7 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, // Issue just the header range for each file. When the header is complete, // we'll issue the splits for that file. Splits cannot be processed until the // header is parsed (the header object is then shared across splits for that file). - vector<DiskIoMgr::ScanRange*> header_ranges; + vector<ScanRange*> header_ranges; for (int i = 0; i < files.size(); ++i) { ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(files[i]->splits[0]->meta_data()); @@ -57,9 +58,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, // it is not cached. // TODO: add remote disk id and plumb that through to the io mgr. It should have // 1 queue for each NIC as well? - DiskIoMgr::ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs, + ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1, false, - DiskIoMgr::BufferOpts::Uncached()); + BufferOpts::Uncached()); header_ranges.push_back(header_range); } // Issue the header ranges only. GetNextInternal() will issue the files' scan ranges @@ -310,7 +311,7 @@ void BaseSequenceScanner::CloseFileRanges(const char* filename) { DCHECK(only_parsing_header_); HdfsFileDesc* desc = scan_node_->GetFileDesc( context_->partition_descriptor()->id(), filename); - const vector<DiskIoMgr::ScanRange*>& splits = desc->splits; + const vector<ScanRange*>& splits = desc->splits; for (int i = 0; i < splits.size(); ++i) { COUNTER_ADD(bytes_skipped_counter_, splits[i]->len()); scan_node_->RangeComplete(file_format(), THdfsCompression::NONE); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 7fae959..f407877 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -27,6 +27,7 @@ #include "exec/parquet-column-stats.h" #include "exec/scanner-context.inline.h" #include "runtime/collection-value-builder.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/runtime-state.h" #include "runtime/runtime-filter.inline.h" #include "rpc/thrift-util.h" @@ -35,6 +36,7 @@ using std::move; using namespace impala; +using namespace impala::io; DEFINE_double(parquet_min_filter_reject_ratio, 0.1, "(Advanced) If the percentage of " "rows rejected by a runtime filter drops below this value, the filter is disabled."); @@ -67,7 +69,7 @@ const string PARQUET_MEM_LIMIT_EXCEEDED = Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const std::vector<HdfsFileDesc*>& files) { - vector<DiskIoMgr::ScanRange*> footer_ranges; + vector<ScanRange*> footer_ranges; for (int i = 0; i < files.size(); ++i) { // If the file size is less than 12 bytes, it is an invalid Parquet file. if (files[i]->file_length < 12) { @@ -80,10 +82,10 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, DCHECK_GE(footer_start, 0); // Try to find the split with the footer. - DiskIoMgr::ScanRange* footer_split = FindFooterSplit(files[i]); + ScanRange* footer_split = FindFooterSplit(files[i]); for (int j = 0; j < files[i]->splits.size(); ++j) { - DiskIoMgr::ScanRange* split = files[i]->splits[j]; + ScanRange* split = files[i]->splits[j]; DCHECK_LE(split->offset() + split->len(), files[i]->file_length); // If there are no materialized slots (such as count(*) over the table), we can @@ -98,19 +100,19 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, // is done here, followed by scan ranges for the columns of each row group within // the actual split (in InitColumns()). The original split is stored in the // metadata associated with the footer range. - DiskIoMgr::ScanRange* footer_range; + ScanRange* footer_range; if (footer_split != NULL) { footer_range = scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(), footer_size, footer_start, split_metadata->partition_id, footer_split->disk_id(), footer_split->expected_local(), - DiskIoMgr::BufferOpts(footer_split->try_cache(), files[i]->mtime), split); + BufferOpts(footer_split->try_cache(), files[i]->mtime), split); } else { // If we did not find the last split, we know it is going to be a remote read. footer_range = scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(), footer_size, footer_start, split_metadata->partition_id, -1, false, - DiskIoMgr::BufferOpts::Uncached(), split); + BufferOpts::Uncached(), split); } footer_ranges.push_back(footer_range); @@ -125,10 +127,10 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, return Status::OK(); } -DiskIoMgr::ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) { +ScanRange* HdfsParquetScanner::FindFooterSplit(HdfsFileDesc* file) { DCHECK(file != NULL); for (int i = 0; i < file->splits.size(); ++i) { - DiskIoMgr::ScanRange* split = file->splits[i]; + ScanRange* split = file->splits[i]; if (split->offset() + split->len() == file->file_length) return split; } return NULL; @@ -341,7 +343,7 @@ static int64_t GetRowGroupMidOffset(const parquet::RowGroup& row_group) { // Returns true if 'row_group' overlaps with 'split_range'. static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group, - const DiskIoMgr::ScanRange* split_range) { + const ScanRange* split_range) { int64_t row_group_start = GetColumnStartOffset(row_group.columns[0].meta_data); const parquet::ColumnMetaData& last_column = @@ -598,7 +600,7 @@ Status HdfsParquetScanner::EvaluateStatsConjuncts( } Status HdfsParquetScanner::NextRowGroup() { - const DiskIoMgr::ScanRange* split_range = static_cast<ScanRangeMetadata*>( + const ScanRange* split_range = static_cast<ScanRangeMetadata*>( metadata_range_->meta_data())->original_split; int64_t split_offset = split_range->offset(); int64_t split_length = split_range->len(); @@ -1377,12 +1379,12 @@ Status HdfsParquetScanner::ProcessFooter() { DiskIoMgr* io_mgr = scan_node_->runtime_state()->io_mgr(); // Read the header into the metadata buffer. - DiskIoMgr::ScanRange* metadata_range = scan_node_->AllocateScanRange( + ScanRange* metadata_range = scan_node_->AllocateScanRange( metadata_range_->fs(), filename(), metadata_size, metadata_start, partition_id, metadata_range_->disk_id(), metadata_range_->expected_local(), - DiskIoMgr::BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size)); + BufferOpts::ReadInto(metadata_buffer.buffer(), metadata_size)); - unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer; + unique_ptr<BufferDescriptor> io_buffer; RETURN_IF_ERROR( io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer)); DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer()); @@ -1589,7 +1591,7 @@ Status HdfsParquetScanner::InitColumns( parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx]; // All the scan ranges (one for each column). - vector<DiskIoMgr::ScanRange*> col_ranges; + vector<ScanRange*> col_ranges; // Used to validate that the number of values in each reader in column_readers_ is the // same. int num_values = -1; @@ -1656,17 +1658,17 @@ Status HdfsParquetScanner::InitColumns( "filename '$1'", col_chunk.file_path, filename())); } - const DiskIoMgr::ScanRange* split_range = + const ScanRange* split_range = static_cast<ScanRangeMetadata*>(metadata_range_->meta_data())->original_split; // Determine if the column is completely contained within a local split. bool col_range_local = split_range->expected_local() && col_start >= split_range->offset() && col_end <= split_range->offset() + split_range->len(); - DiskIoMgr::ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(), + ScanRange* col_range = scan_node_->AllocateScanRange(metadata_range_->fs(), filename(), col_len, col_start, partition_id, split_range->disk_id(), col_range_local, - DiskIoMgr::BufferOpts(split_range->try_cache(), file_desc->mtime)); + BufferOpts(split_range->try_cache(), file_desc->mtime)); col_ranges.push_back(col_range); // Get the stream that will be used for this column http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-parquet-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.h b/be/src/exec/hdfs-parquet-scanner.h index e4b6ae7..0eea458 100644 --- a/be/src/exec/hdfs-parquet-scanner.h +++ b/be/src/exec/hdfs-parquet-scanner.h @@ -442,7 +442,7 @@ class HdfsParquetScanner : public HdfsScanner { ParquetFileVersion file_version_; /// Scan range for the metadata. - const DiskIoMgr::ScanRange* metadata_range_; + const io::ScanRange* metadata_range_; /// Pool to copy dictionary page buffer into. This pool is shared across all the /// pages in a column chunk. @@ -585,7 +585,7 @@ class HdfsParquetScanner : public HdfsScanner { /// Find and return the last split in the file if it is assigned to this scan node. /// Returns NULL otherwise. - static DiskIoMgr::ScanRange* FindFooterSplit(HdfsFileDesc* file); + static io::ScanRange* FindFooterSplit(HdfsFileDesc* file); /// Process the file footer and parse file_metadata_. This should be called with the /// last FOOTER_SIZE bytes in context_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 9149097..62dbd6a 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -32,11 +32,12 @@ #include "codegen/llvm-codegen.h" #include "common/logging.h" #include "common/object-pool.h" -#include "exprs/scalar-expr.h" #include "exprs/scalar-expr-evaluator.h" +#include "exprs/scalar-expr.h" #include "runtime/descriptors.h" -#include "runtime/disk-io-mgr-reader-context.h" #include "runtime/hdfs-fs-cache.h" +#include "runtime/io/disk-io-mgr.h" +#include "runtime/io/request-context.h" #include "runtime/runtime-filter.inline.h" #include "runtime/runtime-state.h" #include "util/disk-info.h" @@ -54,6 +55,7 @@ DECLARE_bool(skip_file_runtime_filtering); namespace filesystem = boost::filesystem; using namespace impala; +using namespace impala::io; using namespace strings; const string HdfsScanNodeBase::HDFS_SPLIT_STATS_DESC = @@ -236,7 +238,7 @@ Status HdfsScanNodeBase::Prepare(RuntimeState* state) { file_desc->splits.push_back( AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length, split.offset, split.partition_id, params.volume_id, expected_local, - DiskIoMgr::BufferOpts(try_cache, file_desc->mtime))); + BufferOpts(try_cache, file_desc->mtime))); } // Update server wide metrics for number of scan ranges and ranges that have @@ -485,10 +487,10 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates( return true; } -DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, +ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, - const DiskIoMgr::BufferOpts& buffer_opts, - const DiskIoMgr::ScanRange* original_split) { + const BufferOpts& buffer_opts, + const ScanRange* original_split) { DCHECK_GE(disk_id, -1); // Require that the scan range is within [0, file_length). While this cannot be used // to guarantee safety (file_length metadata may be stale), it avoids different @@ -502,21 +504,20 @@ DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* ScanRangeMetadata* metadata = runtime_state_->obj_pool()->Add( new ScanRangeMetadata(partition_id, original_split)); - DiskIoMgr::ScanRange* range = - runtime_state_->obj_pool()->Add(new DiskIoMgr::ScanRange()); + ScanRange* range = runtime_state_->obj_pool()->Add(new ScanRange); range->Reset(fs, file, len, offset, disk_id, expected_local, buffer_opts, metadata); return range; } -DiskIoMgr::ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, +ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, - bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split) { + bool expected_local, int mtime, const ScanRange* original_split) { return AllocateScanRange(fs, file, len, offset, partition_id, disk_id, expected_local, - DiskIoMgr::BufferOpts(try_cache, mtime), original_split); + BufferOpts(try_cache, mtime), original_split); } Status HdfsScanNodeBase::AddDiskIoRanges( - const vector<DiskIoMgr::ScanRange*>& ranges, int num_files_queued) { + const vector<ScanRange*>& ranges, int num_files_queued) { RETURN_IF_ERROR(runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges)); num_unqueued_files_.Add(-num_files_queued); DCHECK_GE(num_unqueued_files_.Load(), 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index e6b2154..923b50a 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -31,11 +31,11 @@ #include "exec/filter-context.h" #include "exec/scan-node.h" #include "runtime/descriptors.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/io/request-ranges.h" #include "util/avro-util.h" +#include "util/container-util.h" #include "util/progress-updater.h" #include "util/spinlock.h" -#include "util/container-util.h" namespace impala { @@ -72,7 +72,7 @@ struct HdfsFileDesc { THdfsCompression::type file_compression; /// Splits (i.e. raw byte ranges) for this file, assigned to this scan node. - std::vector<DiskIoMgr::ScanRange*> splits; + std::vector<io::ScanRange*> splits; }; /// Struct for additional metadata for scan ranges. This contains the partition id @@ -84,9 +84,9 @@ struct ScanRangeMetadata { /// For parquet scan ranges we initially create a request for the file footer for each /// split; we store a pointer to the actual split so that we can recover its information /// for the scanner to process. - const DiskIoMgr::ScanRange* original_split; + const io::ScanRange* original_split; - ScanRangeMetadata(int64_t partition_id, const DiskIoMgr::ScanRange* original_split) + ScanRangeMetadata(int64_t partition_id, const io::ScanRange* original_split) : partition_id(partition_id), original_split(original_split) { } }; @@ -154,7 +154,7 @@ class HdfsScanNodeBase : public ScanNode { const HdfsTableDescriptor* hdfs_table() const { return hdfs_table_; } const AvroSchemaElement& avro_schema() const { return *avro_schema_.get(); } int skip_header_line_count() const { return skip_header_line_count_; } - DiskIoRequestContext* reader_context() const { return reader_context_.get(); } + io::RequestContext* reader_context() const { return reader_context_.get(); } bool optimize_parquet_count_star() const { return optimize_parquet_count_star_; } int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; } @@ -204,22 +204,22 @@ class HdfsScanNodeBase : public ScanNode { /// If not NULL, the 'original_split' pointer is stored for reference in the scan range /// metadata of the scan range that is to be allocated. /// This is thread safe. - DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, + io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, - const DiskIoMgr::BufferOpts& buffer_opts, - const DiskIoMgr::ScanRange* original_split = NULL); + const io::BufferOpts& buffer_opts, + const io::ScanRange* original_split = NULL); /// Old API for compatibility with text scanners (e.g. LZO text scanner). - DiskIoMgr::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, + io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool try_cache, - bool expected_local, int mtime, const DiskIoMgr::ScanRange* original_split = NULL); + bool expected_local, int mtime, const io::ScanRange* original_split = NULL); /// Adds ranges to the io mgr queue. 'num_files_queued' indicates how many file's scan /// ranges have been added completely. A file's scan ranges are added completely if no /// new scanner threads will be needed to process that file besides the additional /// threads needed to process those in 'ranges'. /// Can be overridden to add scan-node specific actions like starting scanner threads. - virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, + virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges, int num_files_queued) WARN_UNUSED_RESULT; /// Adds all splits for file_desc to the io mgr queue and indicates one file has @@ -336,7 +336,7 @@ class HdfsScanNodeBase : public ScanNode { const int parquet_count_star_slot_offset_; /// RequestContext object to use with the disk-io-mgr for reads. - std::unique_ptr<DiskIoRequestContext> reader_context_; + std::unique_ptr<io::RequestContext> reader_context_; /// Descriptor for tuples this scan node constructs const TupleDescriptor* tuple_desc_ = nullptr; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node-mt.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h index 4ce12fe..3502b18 100644 --- a/be/src/exec/hdfs-scan-node-mt.h +++ b/be/src/exec/hdfs-scan-node-mt.h @@ -50,7 +50,7 @@ class HdfsScanNodeMt : public HdfsScanNodeBase { private: /// Current scan range and corresponding scanner. - DiskIoMgr::ScanRange* scan_range_; + io::ScanRange* scan_range_; boost::scoped_ptr<ScannerContext> scanner_ctx_; boost::scoped_ptr<HdfsScanner> scanner_; }; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 78f2ffa..2d58c05 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -43,6 +43,7 @@ DECLARE_bool(skip_file_runtime_filtering); #endif using namespace impala; +using namespace impala::io; // Amount of memory that we approximate a scanner thread will use not including IoBuffers. // The memory used does not vary considerably between file formats (just a couple of MBs). @@ -251,7 +252,7 @@ void HdfsScanNode::AddMaterializedRowBatch(unique_ptr<RowBatch> row_batch) { materialized_row_batches_->AddBatch(move(row_batch)); } -Status HdfsScanNode::AddDiskIoRanges(const vector<DiskIoMgr::ScanRange*>& ranges, +Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges, int num_files_queued) { RETURN_IF_ERROR( runtime_state_->io_mgr()->AddScanRanges(reader_context_.get(), ranges)); @@ -420,7 +421,7 @@ void HdfsScanNode::ScannerThread() { // to return if there's an error. ranges_issued_barrier_.Wait(SCANNER_THREAD_WAIT_TIME_MS, &unused); - DiskIoMgr::ScanRange* scan_range; + ScanRange* scan_range; // Take a snapshot of num_unqueued_files_ before calling GetNextRange(). // We don't want num_unqueued_files_ to go to zero between the return from // GetNextRange() and the check for when all ranges are complete. @@ -480,7 +481,7 @@ exit: } Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) { + MemPool* expr_results_pool, ScanRange* scan_range) { DCHECK(scan_range != NULL); ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(scan_range->meta_data()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 30435c2..a1c97cf 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -29,7 +29,7 @@ #include "exec/filter-context.h" #include "exec/hdfs-scan-node-base.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/io/disk-io-mgr.h" #include "util/counting-barrier.h" #include "util/thread.h" @@ -79,7 +79,7 @@ class HdfsScanNode : public HdfsScanNodeBase { bool done() const { return done_; } /// Adds ranges to the io mgr queue and starts up new scanner threads if possible. - virtual Status AddDiskIoRanges(const std::vector<DiskIoMgr::ScanRange*>& ranges, + virtual Status AddDiskIoRanges(const std::vector<io::ScanRange*>& ranges, int num_files_queued) WARN_UNUSED_RESULT; /// Adds a materialized row batch for the scan node. This is called from scanner @@ -166,7 +166,7 @@ class HdfsScanNode : public HdfsScanNodeBase { /// thread. 'filter_ctxs' is a clone of the class-wide filter_ctxs_, used to filter rows /// in this split. Status ProcessSplit(const std::vector<FilterContext>& filter_ctxs, - MemPool* expr_results_pool, DiskIoMgr::ScanRange* scan_range) WARN_UNUSED_RESULT; + MemPool* expr_results_pool, io::ScanRange* scan_range) WARN_UNUSED_RESULT; /// Returns true if there is enough memory (against the mem tracker limits) to /// have a scanner thread. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index d633734..487c6fc 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -40,6 +40,7 @@ using boost::algorithm::ends_with; using boost::algorithm::to_lower; using namespace impala; +using namespace impala::io; using namespace strings; const char* HdfsTextScanner::LLVM_CLASS_NAME = "class.impala::HdfsTextScanner"; @@ -74,7 +75,7 @@ HdfsTextScanner::~HdfsTextScanner() { Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector<HdfsFileDesc*>& files) { - vector<DiskIoMgr::ScanRange*> compressed_text_scan_ranges; + vector<ScanRange*> compressed_text_scan_ranges; int compressed_text_files = 0; vector<HdfsFileDesc*> lzo_text_files; for (int i = 0; i < files.size(); ++i) { @@ -95,7 +96,7 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, // In order to decompress gzip-, snappy- and bzip2-compressed text files, we // need to read entire files. Only read a file if we're assigned the first split // to avoid reading multi-block files with multiple scanners. - DiskIoMgr::ScanRange* split = files[i]->splits[j]; + ScanRange* split = files[i]->splits[j]; // We only process the split that starts at offset 0. if (split->offset() != 0) { @@ -114,10 +115,10 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, DCHECK_GT(files[i]->file_length, 0); ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(split->meta_data()); - DiskIoMgr::ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs, + ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(), files[i]->file_length, 0, metadata->partition_id, split->disk_id(), split->expected_local(), - DiskIoMgr::BufferOpts(split->try_cache(), files[i]->mtime)); + BufferOpts(split->try_cache(), files[i]->mtime)); compressed_text_scan_ranges.push_back(file_range); scan_node->max_compressed_text_file_length()->Set(files[i]->file_length); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/kudu-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc index 77fac89..6d5e085 100644 --- a/be/src/exec/kudu-scan-node.cc +++ b/be/src/exec/kudu-scan-node.cc @@ -52,7 +52,7 @@ KuduScanNode::KuduScanNode(ObjectPool* pool, const TPlanNode& tnode, // This value is built the same way as it assumes that the scan node runs co-located // with a Kudu tablet server and that the tablet server is using disks similarly as // a datanode would. - max_row_batches = 10 * (DiskInfo::num_disks() + DiskIoMgr::REMOTE_NUM_DISKS); + max_row_batches = 10 * (DiskInfo::num_disks() + io::DiskIoMgr::REMOTE_NUM_DISKS); } materialized_row_batches_.reset(new RowBatchQueue(max_row_batches)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index 8cb195d..d9de769 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -21,6 +21,7 @@ #include "exec/hdfs-scan-node-base.h" #include "exec/hdfs-scan-node.h" +#include "runtime/io/disk-io-mgr.h" #include "runtime/exec-env.h" #include "runtime/mem-pool.h" #include "runtime/row-batch.h" @@ -32,6 +33,7 @@ #include "common/names.h" using namespace impala; +using namespace impala::io; using namespace strings; static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; @@ -43,7 +45,7 @@ static const int64_t INIT_READ_PAST_SIZE_BYTES = 64 * 1024; static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0; ScannerContext::ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node, - HdfsPartitionDescriptor* partition_desc, DiskIoMgr::ScanRange* scan_range, + HdfsPartitionDescriptor* partition_desc, ScanRange* scan_range, const vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool) : state_(state), scan_node_(scan_node), @@ -75,7 +77,7 @@ ScannerContext::Stream::Stream(ScannerContext* parent) boundary_buffer_(new StringBuffer(boundary_pool_.get())) { } -ScannerContext::Stream* ScannerContext::AddStream(DiskIoMgr::ScanRange* range) { +ScannerContext::Stream* ScannerContext::AddStream(ScanRange* range) { std::unique_ptr<Stream> stream(new Stream(this)); stream->scan_range_ = range; stream->file_desc_ = scan_node_->GetFileDesc(partition_desc_->id(), stream->filename()); @@ -105,7 +107,7 @@ void ScannerContext::Stream::ReleaseCompletedResources(bool done) { scan_range_->Cancel(Status::CANCELLED); } - for (unique_ptr<DiskIoMgr::BufferDescriptor>& buffer : completed_io_buffers_) { + for (unique_ptr<BufferDescriptor>& buffer : completed_io_buffers_) { ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(buffer)); } parent_->num_completed_io_buffers_ -= completed_io_buffers_.size(); @@ -164,9 +166,9 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { return Status::OK(); } int64_t partition_id = parent_->partition_descriptor()->id(); - DiskIoMgr::ScanRange* range = parent_->scan_node_->AllocateScanRange( + ScanRange* range = parent_->scan_node_->AllocateScanRange( scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id, - scan_range_->disk_id(), false, DiskIoMgr::BufferOpts::Uncached()); + scan_range_->disk_id(), false, BufferOpts::Uncached()); RETURN_IF_ERROR(parent_->state_->io_mgr()->Read( parent_->scan_node_->reader_context(), range, &io_buffer_)); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/exec/scanner-context.h ---------------------------------------------------------------------- diff --git a/be/src/exec/scanner-context.h b/be/src/exec/scanner-context.h index 216209f..3ad6753 100644 --- a/be/src/exec/scanner-context.h +++ b/be/src/exec/scanner-context.h @@ -27,7 +27,7 @@ #include "common/compiler-util.h" #include "common/status.h" #include "exec/filter-context.h" -#include "runtime/disk-io-mgr.h" +#include "runtime/io/request-ranges.h" namespace impala { @@ -65,7 +65,7 @@ class ScannerContext { /// get pushed to) and the scan range to process. /// This context starts with 1 stream. ScannerContext(RuntimeState*, HdfsScanNodeBase*, HdfsPartitionDescriptor*, - DiskIoMgr::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, + io::ScanRange* scan_range, const std::vector<FilterContext>& filter_ctxs, MemPool* expr_results_pool); /// Destructor verifies that all stream objects have been released. @@ -125,7 +125,7 @@ class ScannerContext { bool eof() const { return file_offset() == file_len_; } const char* filename() { return scan_range_->file(); } - const DiskIoMgr::ScanRange* scan_range() { return scan_range_; } + const io::ScanRange* scan_range() { return scan_range_; } const HdfsFileDesc* file_desc() { return file_desc_; } /// Returns the buffer's current offset in the file. @@ -176,7 +176,7 @@ class ScannerContext { private: friend class ScannerContext; ScannerContext* parent_; - DiskIoMgr::ScanRange* scan_range_; + io::ScanRange* scan_range_; const HdfsFileDesc* file_desc_; /// Total number of bytes returned from GetBytes() @@ -195,7 +195,7 @@ class ScannerContext { int64_t next_read_past_size_bytes_; /// The current io buffer. This starts as NULL before we've read any bytes. - std::unique_ptr<DiskIoMgr::BufferDescriptor> io_buffer_; + std::unique_ptr<io::BufferDescriptor> io_buffer_; /// Next byte to read in io_buffer_ uint8_t* io_buffer_pos_; @@ -227,7 +227,7 @@ class ScannerContext { /// On the next GetBytes() call, these buffers are released (the caller by calling /// GetBytes() signals it is done with its previous bytes). At this point the /// buffers are returned to the I/O manager. - std::deque<std::unique_ptr<DiskIoMgr::BufferDescriptor>> completed_io_buffers_; + std::deque<std::unique_ptr<io::BufferDescriptor>> completed_io_buffers_; Stream(ScannerContext* parent); @@ -290,7 +290,7 @@ class ScannerContext { /// Add a stream to this ScannerContext for 'range'. Returns the added stream. /// The stream is created in the runtime state's object pool - Stream* AddStream(DiskIoMgr::ScanRange* range); + Stream* AddStream(io::ScanRange* range); /// Returns false if scan_node_ is multi-threaded and has been cancelled. /// Always returns false if the scan_node_ is not multi-threaded. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 41805af..0d4b61c 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -16,6 +16,7 @@ # under the License. add_subdirectory(bufferpool) +add_subdirectory(io) # where to put generated libraries set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime") @@ -36,10 +37,6 @@ add_library(Runtime data-stream-sender.cc debug-options.cc descriptors.cc - disk-io-mgr.cc - disk-io-mgr-reader-context.cc - disk-io-mgr-scan-range.cc - disk-io-mgr-stress.cc exec-env.cc fragment-instance-state.cc hbase-table.cc @@ -78,16 +75,11 @@ add_library(Runtime ) add_dependencies(Runtime gen-deps) -# This test runs forever so should not be part of 'make test' -add_executable(disk-io-mgr-stress-test disk-io-mgr-stress-test.cc) -target_link_libraries(disk-io-mgr-stress-test ${IMPALA_TEST_LINK_LIBS}) - ADD_BE_TEST(mem-pool-test) ADD_BE_TEST(free-pool-test) ADD_BE_TEST(string-buffer-test) ADD_BE_TEST(data-stream-test) ADD_BE_TEST(timestamp-test) -ADD_BE_TEST(disk-io-mgr-test) ADD_BE_TEST(raw-value-test) ADD_BE_TEST(string-compare-test) ADD_BE_TEST(string-search-test) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-handle-cache.h b/be/src/runtime/disk-io-mgr-handle-cache.h deleted file mode 100644 index 4ba2342..0000000 --- a/be/src/runtime/disk-io-mgr-handle-cache.h +++ /dev/null @@ -1,196 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H -#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_H - -#include <array> -#include <list> -#include <map> -#include <memory> - -#include <boost/thread/mutex.hpp> - -#include "common/hdfs.h" -#include "common/status.h" -#include "util/aligned-new.h" -#include "util/impalad-metrics.h" -#include "util/spinlock.h" -#include "util/thread.h" - -namespace impala { - -/// This class is a small wrapper around the hdfsFile handle and the file system -/// instance which is needed to close the file handle. The handle incorporates -/// the last modified time of the file when it was opened. This is used to distinguish -/// between file handles for files that can be updated or overwritten. -class HdfsFileHandle { - public: - - /// Constructor will open the file - HdfsFileHandle(const hdfsFS& fs, const char* fname, int64_t mtime); - - /// Destructor will close the file handle - ~HdfsFileHandle(); - - hdfsFile file() const { return hdfs_file_; } - int64_t mtime() const { return mtime_; } - bool ok() const { return hdfs_file_ != nullptr; } - - private: - hdfsFS fs_; - hdfsFile hdfs_file_; - int64_t mtime_; -}; - -/// The FileHandleCache is a data structure that owns HdfsFileHandles to share between -/// threads. The HdfsFileHandles are hash partitioned across NUM_PARTITIONS partitions. -/// Each partition operates independently with its own locks, reducing contention -/// between concurrent threads. The `capacity` is split between the partitions and is -/// enforced independently. -/// -/// Threads check out a file handle for exclusive access and return it when finished. -/// If the file handle is not already present in the cache or all file handles for this -/// file are checked out, the file handle is constructed and added to the cache. -/// The cache can contain multiple file handles for the same file. If a file handle -/// is checked out, it cannot be evicted from the cache. In this case, a cache can -/// exceed the specified capacity. -/// -/// The file handle cache is currently not suitable for remote files that maintain a -/// connection as part of the handle. Most remote systems have a limit on the number -/// of concurrent connections, and file handles in the cache would be counted towards -/// that limit. -/// -/// If there is a file handle in the cache and the underlying file is deleted, -/// the file handle might keep the file from being deleted at the OS level. This can -/// take up disk space and impact correctness. To avoid this, the cache will evict any -/// file handle that has been unused for longer than threshold specified by -/// `unused_handle_timeout_secs`. Eviction is disabled when the threshold is 0. -/// -/// TODO: The cache should also evict file handles more aggressively if the file handle's -/// mtime is older than the file's current mtime. -template <size_t NUM_PARTITIONS> -class FileHandleCache { - public: - /// Instantiates the cache with `capacity` split evenly across NUM_PARTITIONS - /// partitions. If the capacity does not split evenly, then the capacity is rounded - /// up. The cache will age out any file handle that is unused for - /// `unused_handle_timeout_secs` seconds. Age out is disabled if this is set to zero. - FileHandleCache(size_t capacity, uint64_t unused_handle_timeout_secs); - - /// Destructor is only called for backend tests - ~FileHandleCache(); - - /// Starts up a thread that monitors the age of file handles and evicts any that - /// exceed the limit. - Status Init() WARN_UNUSED_RESULT; - - /// Get a file handle from the cache for the specified filename (fname) and - /// last modification time (mtime). This will hash the filename to determine - /// which partition to use for this file handle. - /// - /// If 'require_new_handle' is false and the partition contains an available handle, - /// the handle is returned and cache_hit is set to true. Otherwise, the partition will - /// try to construct a file handle and add it to the partition. On success, the new - /// file handle will be returned with cache_hit set to false. On failure, nullptr will - /// be returned. In either case, the partition may evict a file handle to make room - /// for the new file handle. - /// - /// This obtains exclusive control over the returned file handle. It must be paired - /// with a call to ReleaseFileHandle to release exclusive control. - HdfsFileHandle* GetFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, - bool require_new_handle, bool* cache_hit); - - /// Release the exclusive hold on the specified file handle (which was obtained - /// by calling GetFileHandle). The cache may evict a file handle if the cache is - /// above capacity. If 'destroy_handle' is true, immediately remove this handle - /// from the cache. - void ReleaseFileHandle(std::string* fname, HdfsFileHandle* fh, bool destroy_handle); - - private: - struct FileHandleEntry; - typedef std::multimap<std::string, FileHandleEntry> MapType; - - struct LruListEntry { - LruListEntry(typename MapType::iterator map_entry_in); - typename MapType::iterator map_entry; - uint64_t timestamp_seconds; - }; - typedef std::list<LruListEntry> LruListType; - - struct FileHandleEntry { - FileHandleEntry(HdfsFileHandle* fh_in, LruListType& lru_list) - : fh(fh_in), lru_entry(lru_list.end()) {} - std::unique_ptr<HdfsFileHandle> fh; - - /// in_use is true for a file handle checked out via GetFileHandle() that has not - /// been returned via ReleaseFileHandle(). - bool in_use = false; - - /// Iterator to this element's location in the LRU list. This only points to a - /// valid location when in_use is true. For error-checking, this is set to - /// lru_list.end() when in_use is false. - typename LruListType::iterator lru_entry; - }; - - /// Each partition operates independently, and thus has its own cache, LRU list, - /// and corresponding lock. To avoid contention on the lock_ due to false sharing - /// the partitions are aligned to cache line boundaries. - struct FileHandleCachePartition : public CacheLineAligned { - /// Protects access to cache and lru_list. - SpinLock lock; - - /// Multimap from the file name to the file handles for that file. The cache - /// can contain multiple file handles for the same file and some may have - /// different mtimes if the file is being modified. All file handles are always - /// owned by the cache. - MapType cache; - - /// The LRU list only contains file handles that are not in use. - LruListType lru_list; - - /// Maximum number of file handles in cache without evicting unused file handles. - /// It is not a strict limit, and can be exceeded if all file handles are in use. - size_t capacity; - - /// Current number of file handles in the cache - size_t size; - }; - - /// Periodic check to evict unused file handles. Only executed by eviction_thread_. - void EvictHandlesLoop(); - static const int64_t EVICT_HANDLES_PERIOD_MS = 1000; - - /// If the partition is above its capacity, evict the oldest unused file handles to - /// enforce the capacity. - void EvictHandles(FileHandleCachePartition& p); - - std::array<FileHandleCachePartition, NUM_PARTITIONS> cache_partitions_; - - /// Maximum time before an unused file handle is aged out of the cache. - /// Aging out is disabled if this is set to 0. - uint64_t unused_handle_timeout_secs_; - - /// Thread to check for unused file handles to evict. This thread will exit when - /// the shut_down_promise_ is set. - std::unique_ptr<Thread> eviction_thread_; - Promise<bool> shut_down_promise_; -}; - -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-handle-cache.inline.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-handle-cache.inline.h b/be/src/runtime/disk-io-mgr-handle-cache.inline.h deleted file mode 100644 index 3068971..0000000 --- a/be/src/runtime/disk-io-mgr-handle-cache.inline.h +++ /dev/null @@ -1,231 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include <tuple> - -#include "runtime/disk-io-mgr-handle-cache.h" -#include "util/hash-util.h" -#include "util/time.h" - -#ifndef IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H -#define IMPALA_RUNTIME_DISK_IO_MGR_HANDLE_CACHE_INLINE_H - -namespace impala { - -HdfsFileHandle::HdfsFileHandle(const hdfsFS& fs, const char* fname, - int64_t mtime) - : fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) { - ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(1L); - VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_; -} - -HdfsFileHandle::~HdfsFileHandle() { - if (hdfs_file_ != nullptr && fs_ != nullptr) { - ImpaladMetrics::IO_MGR_NUM_CACHED_FILE_HANDLES->Increment(-1L); - VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_; - hdfsCloseFile(fs_, hdfs_file_); - } - fs_ = nullptr; - hdfs_file_ = nullptr; -} - -template <size_t NUM_PARTITIONS> - FileHandleCache<NUM_PARTITIONS>::FileHandleCache(size_t capacity, - uint64_t unused_handle_timeout_secs) - : unused_handle_timeout_secs_(unused_handle_timeout_secs) { - DCHECK_GT(NUM_PARTITIONS, 0); - size_t remainder = capacity % NUM_PARTITIONS; - size_t base_capacity = capacity / NUM_PARTITIONS; - size_t partition_capacity = (remainder > 0 ? base_capacity + 1 : base_capacity); - for (FileHandleCachePartition& p : cache_partitions_) { - p.size = 0; - p.capacity = partition_capacity; - } -} - -template <size_t NUM_PARTITIONS> -FileHandleCache<NUM_PARTITIONS>::LruListEntry::LruListEntry( - typename MapType::iterator map_entry_in) - : map_entry(map_entry_in), timestamp_seconds(MonotonicSeconds()) {} - -template <size_t NUM_PARTITIONS> -FileHandleCache<NUM_PARTITIONS>::~FileHandleCache() { - shut_down_promise_.Set(true); - if (eviction_thread_ != nullptr) eviction_thread_->Join(); -} - -template <size_t NUM_PARTITIONS> -Status FileHandleCache<NUM_PARTITIONS>::Init() { - return Thread::Create("disk-io-mgr-handle-cache", "File Handle Timeout", - &FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop, this, &eviction_thread_); -} - -template <size_t NUM_PARTITIONS> -HdfsFileHandle* FileHandleCache<NUM_PARTITIONS>::GetFileHandle( - const hdfsFS& fs, std::string* fname, int64_t mtime, bool require_new_handle, - bool* cache_hit) { - // Hash the key and get appropriate partition - int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS; - FileHandleCachePartition& p = cache_partitions_[index]; - boost::lock_guard<SpinLock> g(p.lock); - pair<typename MapType::iterator, typename MapType::iterator> range = - p.cache.equal_range(*fname); - - // If this requires a new handle, skip to the creation codepath. Otherwise, - // find an unused entry with the same mtime - FileHandleEntry* ret_elem = nullptr; - if (!require_new_handle) { - while (range.first != range.second) { - FileHandleEntry* elem = &range.first->second; - if (!elem->in_use && elem->fh->mtime() == mtime) { - // This element is currently in the lru_list, which means that lru_entry must - // be an iterator pointing into the lru_list. - DCHECK(elem->lru_entry != p.lru_list.end()); - // Remove the element from the lru_list and designate that it is not on - // the lru_list by resetting its iterator to point to the end of the list. - p.lru_list.erase(elem->lru_entry); - elem->lru_entry = p.lru_list.end(); - ret_elem = elem; - *cache_hit = true; - break; - } - ++range.first; - } - } - - // There was no entry that was free or caller asked for a new handle - if (!ret_elem) { - *cache_hit = false; - // Create a new entry and move it into the map - HdfsFileHandle* new_fh = new HdfsFileHandle(fs, fname->data(), mtime); - if (!new_fh->ok()) { - delete new_fh; - return nullptr; - } - FileHandleEntry entry(new_fh, p.lru_list); - typename MapType::iterator new_it = p.cache.emplace_hint(range.second, - *fname, std::move(entry)); - ret_elem = &new_it->second; - ++p.size; - if (p.size > p.capacity) EvictHandles(p); - } - - DCHECK(ret_elem->fh.get() != nullptr); - DCHECK(!ret_elem->in_use); - ret_elem->in_use = true; - ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L); - return ret_elem->fh.get(); -} - -template <size_t NUM_PARTITIONS> -void FileHandleCache<NUM_PARTITIONS>::ReleaseFileHandle(std::string* fname, - HdfsFileHandle* fh, bool destroy_handle) { - DCHECK(fh != nullptr); - // Hash the key and get appropriate partition - int index = HashUtil::Hash(fname->data(), fname->size(), 0) % NUM_PARTITIONS; - FileHandleCachePartition& p = cache_partitions_[index]; - boost::lock_guard<SpinLock> g(p.lock); - pair<typename MapType::iterator, typename MapType::iterator> range = - p.cache.equal_range(*fname); - - // TODO: This can be optimized by maintaining some state in the file handle about - // its location in the map. - typename MapType::iterator release_it = range.first; - while (release_it != range.second) { - FileHandleEntry* elem = &release_it->second; - if (elem->fh.get() == fh) break; - ++release_it; - } - DCHECK(release_it != range.second); - - // This file handle is no longer referenced - FileHandleEntry* release_elem = &release_it->second; - DCHECK(release_elem->in_use); - release_elem->in_use = false; - ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L); - if (destroy_handle) { - --p.size; - p.cache.erase(release_it); - return; - } - // Hdfs can use some memory for readahead buffering. Calling unbuffer reduces - // this buffering so that the file handle takes up less memory when in the cache. - // If unbuffering is not supported, then hdfsUnbufferFile() will return a non-zero - // return code, and we close the file handle and remove it from the cache. - if (hdfsUnbufferFile(release_elem->fh->file()) == 0) { - // This FileHandleEntry must not be in the lru list already, because it was - // in use. Verify this by checking that the lru_entry is pointing to the end, - // which cannot be true for any element in the lru list. - DCHECK(release_elem->lru_entry == p.lru_list.end()); - // Add this to the lru list, establishing links in both directions. - // The FileHandleEntry has an iterator to the LruListEntry and the - // LruListEntry has an iterator to the location of the FileHandleEntry in - // the cache. - release_elem->lru_entry = p.lru_list.emplace(p.lru_list.end(), release_it); - if (p.size > p.capacity) EvictHandles(p); - } else { - VLOG_FILE << "FS does not support file handle unbuffering, closing file=" - << fname; - --p.size; - p.cache.erase(release_it); - } -} - -template <size_t NUM_PARTITIONS> -void FileHandleCache<NUM_PARTITIONS>::EvictHandlesLoop() { - while (true) { - for (FileHandleCachePartition& p : cache_partitions_) { - boost::lock_guard<SpinLock> g(p.lock); - EvictHandles(p); - } - // This Get() will time out until shutdown, when the promise is set. - bool timed_out; - shut_down_promise_.Get(EVICT_HANDLES_PERIOD_MS, &timed_out); - if (!timed_out) break; - } - // The promise must be set to true. - DCHECK(shut_down_promise_.IsSet()); - DCHECK(shut_down_promise_.Get()); -} - -template <size_t NUM_PARTITIONS> -void FileHandleCache<NUM_PARTITIONS>::EvictHandles( - FileHandleCache<NUM_PARTITIONS>::FileHandleCachePartition& p) { - uint64_t now = MonotonicSeconds(); - uint64_t oldest_allowed_timestamp = - now > unused_handle_timeout_secs_ ? now - unused_handle_timeout_secs_ : 0; - while (p.lru_list.size() > 0) { - // Peek at the oldest element - LruListEntry oldest_entry = p.lru_list.front(); - typename MapType::iterator oldest_entry_map_it = oldest_entry.map_entry; - uint64_t oldest_entry_timestamp = oldest_entry.timestamp_seconds; - // If the oldest element does not need to be aged out and the cache is not over - // capacity, then we are done and there is nothing to evict. - if (p.size <= p.capacity && (unused_handle_timeout_secs_ == 0 || - oldest_entry_timestamp >= oldest_allowed_timestamp)) { - return; - } - // Evict the oldest element - DCHECK(!oldest_entry_map_it->second.in_use); - p.cache.erase(oldest_entry_map_it); - p.lru_list.pop_front(); - --p.size; - } -} - -} -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-internal.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h deleted file mode 100644 index cc50af7..0000000 --- a/be/src/runtime/disk-io-mgr-internal.h +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H -#define IMPALA_RUNTIME_DISK_IO_MGR_INTERNAL_H - -#include <unistd.h> -#include <queue> -#include <boost/thread/locks.hpp> -#include <gutil/strings/substitute.h> - -#include "common/logging.h" -#include "runtime/disk-io-mgr-reader-context.h" -#include "runtime/disk-io-mgr.h" -#include "runtime/mem-tracker.h" -#include "runtime/thread-resource-mgr.h" -#include "util/condition-variable.h" -#include "util/cpu-info.h" -#include "util/debug-util.h" -#include "util/disk-info.h" -#include "util/filesystem-util.h" -#include "util/hdfs-util.h" -#include "util/impalad-metrics.h" - -/// This file contains internal structures shared between submodules of the IoMgr. Users -/// of the IoMgr do not need to include this file. -namespace impala { - -/// Per disk state -struct DiskIoMgr::DiskQueue { - /// Disk id (0-based) - int disk_id; - - /// Lock that protects access to 'request_contexts' and 'work_available' - boost::mutex lock; - - /// Condition variable to signal the disk threads that there is work to do or the - /// thread should shut down. A disk thread will be woken up when there is a reader - /// added to the queue. A reader is only on the queue when it has at least one - /// scan range that is not blocked on available buffers. - ConditionVariable work_available; - - /// list of all request contexts that have work queued on this disk - std::list<DiskIoRequestContext*> request_contexts; - - /// Enqueue the request context to the disk queue. The DiskQueue lock must not be taken. - inline void EnqueueContext(DiskIoRequestContext* worker) { - { - boost::unique_lock<boost::mutex> disk_lock(lock); - /// Check that the reader is not already on the queue - DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) == - request_contexts.end()); - request_contexts.push_back(worker); - } - work_available.NotifyAll(); - } - - DiskQueue(int id) : disk_id(id) {} -}; -} - -#endif http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/b840137c/be/src/runtime/disk-io-mgr-reader-context.cc ---------------------------------------------------------------------- diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc deleted file mode 100644 index d62545b..0000000 --- a/be/src/runtime/disk-io-mgr-reader-context.cc +++ /dev/null @@ -1,292 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "runtime/disk-io-mgr-internal.h" - -#include "common/names.h" - -using namespace impala; - -void DiskIoRequestContext::Cancel(const Status& status) { - DCHECK(!status.ok()); - - // Callbacks are collected in this vector and invoked while no lock is held. - vector<WriteRange::WriteDoneCallback> write_callbacks; - { - lock_guard<mutex> lock(lock_); - DCHECK(Validate()) << endl << DebugString(); - - // Already being cancelled - if (state_ == DiskIoRequestContext::Cancelled) return; - - DCHECK(status_.ok()); - status_ = status; - - // The reader will be put into a cancelled state until call cleanup is complete. - state_ = DiskIoRequestContext::Cancelled; - - // Cancel all scan ranges for this reader. Each range could be one one of - // four queues. - for (int i = 0; i < disk_states_.size(); ++i) { - DiskIoRequestContext::PerDiskState& state = disk_states_[i]; - RequestRange* range = NULL; - while ((range = state.in_flight_ranges()->Dequeue()) != NULL) { - if (range->request_type() == RequestType::READ) { - static_cast<ScanRange*>(range)->Cancel(status); - } else { - DCHECK(range->request_type() == RequestType::WRITE); - write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_); - } - } - - ScanRange* scan_range; - while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) { - scan_range->Cancel(status); - } - WriteRange* write_range; - while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) { - write_callbacks.push_back(write_range->callback_); - } - } - - ScanRange* range = NULL; - while ((range = ready_to_start_ranges_.Dequeue()) != NULL) { - range->Cancel(status); - } - while ((range = blocked_ranges_.Dequeue()) != NULL) { - range->Cancel(status); - } - while ((range = cached_ranges_.Dequeue()) != NULL) { - range->Cancel(status); - } - - // Schedule reader on all disks. The disks will notice it is cancelled and do any - // required cleanup - for (int i = 0; i < disk_states_.size(); ++i) { - DiskIoRequestContext::PerDiskState& state = disk_states_[i]; - state.ScheduleContext(this, i); - } - } - - for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) { - write_callback(status_); - } - - // Signal reader and unblock the GetNext/Read thread. That read will fail with - // a cancelled status. - ready_to_start_ranges_cv_.NotifyAll(); -} - -void DiskIoRequestContext::CancelAndMarkInactive() { - Cancel(Status::CANCELLED); - - boost::unique_lock<boost::mutex> l(lock_); - DCHECK_NE(state_, Inactive); - DCHECK(Validate()) << endl << DebugString(); - - // Wait until the ranges finish up. - while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l); - - // Validate that no buffers were leaked from this context. - DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString(); - DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString(); - DCHECK(Validate()) << endl << DebugString(); - state_ = Inactive; -} - -void DiskIoRequestContext::AddRequestRange( - DiskIoMgr::RequestRange* range, bool schedule_immediately) { - // DCHECK(lock_.is_locked()); // TODO: boost should have this API - DiskIoRequestContext::PerDiskState& state = disk_states_[range->disk_id()]; - if (state.done()) { - DCHECK_EQ(state.num_remaining_ranges(), 0); - state.set_done(false); - ++num_disks_with_ranges_; - } - - bool schedule_context; - if (range->request_type() == RequestType::READ) { - DiskIoMgr::ScanRange* scan_range = static_cast<DiskIoMgr::ScanRange*>(range); - if (schedule_immediately) { - ScheduleScanRange(scan_range); - } else { - state.unstarted_scan_ranges()->Enqueue(scan_range); - num_unstarted_scan_ranges_.Add(1); - } - // If next_scan_range_to_start is NULL, schedule this DiskIoRequestContext so that it will - // be set. If it's not NULL, this context will be scheduled when GetNextRange() is - // invoked. - schedule_context = state.next_scan_range_to_start() == NULL; - } else { - DCHECK(range->request_type() == RequestType::WRITE); - DCHECK(!schedule_immediately); - DiskIoMgr::WriteRange* write_range = static_cast<DiskIoMgr::WriteRange*>(range); - state.unstarted_write_ranges()->Enqueue(write_range); - - // ScheduleContext() has no effect if the context is already scheduled, - // so this is safe. - schedule_context = true; - } - - if (schedule_context) state.ScheduleContext(this, range->disk_id()); - ++state.num_remaining_ranges(); -} - -DiskIoRequestContext::DiskIoRequestContext( - DiskIoMgr* parent, int num_disks, MemTracker* tracker) - : parent_(parent), mem_tracker_(tracker), disk_states_(num_disks) {} - -// Dumps out request context information. Lock should be taken by caller -string DiskIoRequestContext::DebugString() const { - stringstream ss; - ss << endl << " DiskIoRequestContext: " << (void*)this << " (state="; - if (state_ == DiskIoRequestContext::Inactive) ss << "Inactive"; - if (state_ == DiskIoRequestContext::Cancelled) ss << "Cancelled"; - if (state_ == DiskIoRequestContext::Active) ss << "Active"; - if (state_ != DiskIoRequestContext::Inactive) { - ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail()) - << " #ready_buffers=" << num_ready_buffers_.Load() - << " #used_buffers=" << num_used_buffers_.Load() - << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load() - << " #finished_scan_ranges=" << num_finished_ranges_.Load() - << " #disk_with_ranges=" << num_disks_with_ranges_ - << " #disks=" << num_disks_with_ranges_; - for (int i = 0; i < disk_states_.size(); ++i) { - ss << endl << " " << i << ": " - << "is_on_queue=" << disk_states_[i].is_on_queue() - << " done=" << disk_states_[i].done() - << " #num_remaining_scan_ranges=" << disk_states_[i].num_remaining_ranges() - << " #in_flight_ranges=" << disk_states_[i].in_flight_ranges()->size() - << " #unstarted_scan_ranges=" << disk_states_[i].unstarted_scan_ranges()->size() - << " #unstarted_write_ranges=" - << disk_states_[i].unstarted_write_ranges()->size() - << " #reading_threads=" << disk_states_[i].num_threads_in_op(); - } - } - ss << ")"; - return ss.str(); -} - -bool DiskIoRequestContext::Validate() const { - if (state_ == DiskIoRequestContext::Inactive) { - LOG(WARNING) << "state_ == DiskIoRequestContext::Inactive"; - return false; - } - - if (num_used_buffers_.Load() < 0) { - LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load(); - return false; - } - - if (num_ready_buffers_.Load() < 0) { - LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load(); - return false; - } - - int total_unstarted_ranges = 0; - for (int i = 0; i < disk_states_.size(); ++i) { - const PerDiskState& state = disk_states_[i]; - bool on_queue = state.is_on_queue(); - int num_reading_threads = state.num_threads_in_op(); - - total_unstarted_ranges += state.unstarted_scan_ranges()->size(); - - if (num_reading_threads < 0) { - LOG(WARNING) << "disk_id=" << i - << "state.num_threads_in_read < 0: #threads=" - << num_reading_threads; - return false; - } - - if (state_ != DiskIoRequestContext::Cancelled) { - if (state.unstarted_scan_ranges()->size() + state.in_flight_ranges()->size() > - state.num_remaining_ranges()) { - LOG(WARNING) << "disk_id=" << i - << " state.unstarted_ranges.size() + state.in_flight_ranges.size()" - << " > state.num_remaining_ranges:" - << " #unscheduled=" << state.unstarted_scan_ranges()->size() - << " #in_flight=" << state.in_flight_ranges()->size() - << " #remaining=" << state.num_remaining_ranges(); - return false; - } - - // If we have an in_flight range, the reader must be on the queue or have a - // thread actively reading for it. - if (!state.in_flight_ranges()->empty() && !on_queue && num_reading_threads == 0) { - LOG(WARNING) << "disk_id=" << i - << " reader has inflight ranges but is not on the disk queue." - << " #in_flight_ranges=" << state.in_flight_ranges()->size() - << " #reading_threads=" << num_reading_threads - << " on_queue=" << on_queue; - return false; - } - - if (state.done() && num_reading_threads > 0) { - LOG(WARNING) << "disk_id=" << i - << " state set to done but there are still threads working." - << " #reading_threads=" << num_reading_threads; - return false; - } - } else { - // Is Cancelled - if (!state.in_flight_ranges()->empty()) { - LOG(WARNING) << "disk_id=" << i - << "Reader cancelled but has in flight ranges."; - return false; - } - if (!state.unstarted_scan_ranges()->empty()) { - LOG(WARNING) << "disk_id=" << i - << "Reader cancelled but has unstarted ranges."; - return false; - } - } - - if (state.done() && on_queue) { - LOG(WARNING) << "disk_id=" << i - << " state set to done but the reader is still on the disk queue." - << " state.done=true and state.is_on_queue=true"; - return false; - } - } - - if (state_ != DiskIoRequestContext::Cancelled) { - if (total_unstarted_ranges != num_unstarted_scan_ranges_.Load()) { - LOG(WARNING) << "total_unstarted_ranges=" << total_unstarted_ranges - << " sum_in_states=" << num_unstarted_scan_ranges_.Load(); - return false; - } - } else { - if (!ready_to_start_ranges_.empty()) { - LOG(WARNING) << "Reader cancelled but has ready to start ranges."; - return false; - } - if (!blocked_ranges_.empty()) { - LOG(WARNING) << "Reader cancelled but has blocked ranges."; - return false; - } - } - - return true; -} - -void DiskIoRequestContext::PerDiskState::ScheduleContext( - DiskIoRequestContext* context, int disk_id) { - if (!is_on_queue_ && !done_) { - is_on_queue_ = true; - context->parent_->disk_queues_[disk_id]->EnqueueContext(context); - } -}