This is an automated email from the ASF dual-hosted git repository. dbecker pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 4e87a80fae86c5775a5f35607806fee287b3797a Author: Michael Smith <[email protected]> AuthorDate: Fri Oct 28 10:57:48 2022 -0700 IMPALA-9488: Add metrics for EC reads Adds metric tracking erasure-coded bytes read. Adds ScanRange::TestInfo to pass file info through calls of AllocateScanRange so it's easier to add erasure coding as another piece of file info. Adds a test to verify that the expected number of bytes are read for existing read metrics and the new `erasure-coded-bytes-read` metric when doing a select. Change-Id: Ieb06bac9dea4b632621653d2935e9a7b2dc81341 Reviewed-on: http://gerrit.cloudera.org:8080/19178 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/base-sequence-scanner.cc | 7 +- be/src/exec/hdfs-scan-node-base.cc | 42 +++++++----- be/src/exec/hdfs-scan-node-base.h | 22 +++--- be/src/exec/hdfs-scanner.cc | 13 ++-- be/src/exec/orc/hdfs-orc-scanner.cc | 13 ++-- be/src/exec/parquet/hdfs-parquet-scanner.cc | 4 +- be/src/exec/parquet/parquet-page-index.cc | 10 ++- be/src/exec/parquet/parquet-page-reader.cc | 9 +-- be/src/exec/scanner-context.cc | 5 +- be/src/exec/text/hdfs-text-scanner.cc | 7 +- be/src/runtime/io/disk-io-mgr-stress.cc | 4 +- be/src/runtime/io/disk-io-mgr-test.cc | 36 +++++----- be/src/runtime/io/hdfs-file-reader.cc | 3 + be/src/runtime/io/request-context.h | 4 ++ be/src/runtime/io/request-ranges.h | 34 +++++++--- be/src/runtime/io/scan-range.cc | 38 +++++------ be/src/runtime/tmp-file-mgr.cc | 10 +-- be/src/scheduling/scheduler.cc | 2 + be/src/util/impalad-metrics.cc | 5 ++ be/src/util/impalad-metrics.h | 4 ++ common/thrift/PlanNodes.thrift | 3 + common/thrift/metrics.json | 10 +++ .../org/apache/impala/planner/HdfsScanNode.java | 1 + tests/query_test/test_io_metrics.py | 79 ++++++++++++++++++++++ 24 files changed, 254 insertions(+), 111 deletions(-) diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index ae66d0e01..83e1221b1 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -62,10 +62,9 @@ Status BaseSequenceScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, bool expected_local = false; int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING; - ScanRange* header_range = scan_node->AllocateScanRange(files[i]->fs, - files[i]->filename.c_str(), header_size, 0, metadata->partition_id, -1, - expected_local, files[i]->mtime, BufferOpts(cache_options), - metadata->original_split); + ScanRange* header_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(), + header_size, 0, metadata->partition_id, -1, expected_local, + BufferOpts(cache_options), metadata->original_split); ScanRangeMetadata* header_metadata = static_cast<ScanRangeMetadata*>(header_range->meta_data()); header_metadata->is_sequence_header = true; diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index b5bab2c13..f6f1f008d 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -111,6 +111,8 @@ PROFILE_DEFINE_COUNTER(BytesReadShortCircuit, STABLE_LOW, TUnit::BYTES, "The total number of bytes read via short circuit read"); PROFILE_DEFINE_COUNTER(BytesReadDataNodeCache, STABLE_HIGH, TUnit::BYTES, "The total number of bytes read from data node cache"); +PROFILE_DEFINE_COUNTER(BytesReadErasureCoded, STABLE_LOW, TUnit::BYTES, + "The total number of bytes read from erasure-coded data"); PROFILE_DEFINE_COUNTER(RemoteScanRanges, STABLE_HIGH, TUnit::UNIT, "The total number of remote scan ranges"); PROFILE_DEFINE_COUNTER(BytesReadRemoteUnexpected, STABLE_LOW, TUnit::BYTES, @@ -298,6 +300,7 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat file_desc->file_length = split.file_length(); file_desc->mtime = split.mtime(); file_desc->file_compression = CompressionTypePBToThrift(split.file_compression()); + file_desc->is_erasure_coded = split.is_erasure_coded(); file_desc->file_format = partition_desc->file_format(); file_desc->file_metadata = file_metadata; RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection( @@ -328,10 +331,9 @@ Status HdfsScanPlanNode::ProcessScanRangesAndInitSharedState(FragmentState* stat } ScanRangeMetadata* metadata = obj_pool->Add(new ScanRangeMetadata(split.partition_id(), nullptr)); - file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, file_desc->fs, - file_desc->filename.c_str(), split.length(), split.offset(), {}, metadata, - params.volume_id(), expected_local, file_desc->mtime, - BufferOpts(cache_options))); + file_desc->splits.push_back(ScanRange::AllocateScanRange(obj_pool, + file_desc->GetFileInfo(), split.length(), split.offset(), {}, metadata, + params.volume_id(), expected_local, BufferOpts(cache_options))); total_splits++; } // Update server wide metrics for number of scan ranges and ranges that have @@ -608,6 +610,7 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) { bytes_read_short_circuit_ = PROFILE_BytesReadShortCircuit.Instantiate(runtime_profile()); bytes_read_dn_cache_ = PROFILE_BytesReadDataNodeCache.Instantiate(runtime_profile()); + bytes_read_ec_ = PROFILE_BytesReadErasureCoded.Instantiate(runtime_profile()); num_remote_ranges_ = PROFILE_RemoteScanRanges.Instantiate(runtime_profile()); unexpected_remote_bytes_ = PROFILE_BytesReadRemoteUnexpected.Instantiate(runtime_profile()); @@ -822,37 +825,37 @@ int64_t HdfsScanNodeBase::IncreaseReservationIncrementally(int64_t curr_reservat return curr_reservation; } -ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, +ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, - int64_t mtime, const BufferOpts& buffer_opts, const ScanRange* original_split) { + const BufferOpts& buffer_opts, const ScanRange* original_split) { ScanRangeMetadata* metadata = shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split)); - return AllocateScanRange(fs, file, len, offset, {}, metadata, disk_id, expected_local, - mtime, buffer_opts); + return AllocateScanRange(fi, len, offset, {}, metadata, disk_id, expected_local, + buffer_opts); } -ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, +ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi, int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, - int64_t partition_id, int disk_id, bool expected_local, int64_t mtime, + int64_t partition_id, int disk_id, bool expected_local, const BufferOpts& buffer_opts, const ScanRange* original_split) { ScanRangeMetadata* metadata = shared_state_->obj_pool()->Add(new ScanRangeMetadata(partition_id, original_split)); - return AllocateScanRange(fs, file, len, offset, move(sub_ranges), metadata, - disk_id, expected_local, mtime, buffer_opts); + return AllocateScanRange(fi, len, offset, move(sub_ranges), metadata, + disk_id, expected_local, buffer_opts); } -ScanRange* HdfsScanNodeBase::AllocateScanRange(hdfsFS fs, const char* file, int64_t len, - int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata, - int disk_id, bool expected_local, int64_t mtime, +ScanRange* HdfsScanNodeBase::AllocateScanRange(const ScanRange::FileInfo &fi, + int64_t len, int64_t offset, vector<ScanRange::SubRange>&& sub_ranges, + ScanRangeMetadata* metadata, int disk_id, bool expected_local, const BufferOpts& buffer_opts) { // Require that the scan range is within [0, file_length). While this cannot be used // to guarantee safety (file_length metadata may be stale), it avoids different // behavior between Hadoop FileSystems (e.g. s3n hdfsSeek() returns error when seeking // beyond the end of the file). - DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, file)->file_length) + DCHECK_LE(offset + len, GetFileDesc(metadata->partition_id, fi.filename)->file_length) << "Scan range beyond end of file (offset=" << offset << ", len=" << len << ")"; - return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fs, file, len, offset, - move(sub_ranges), metadata, disk_id, expected_local, mtime, buffer_opts); + return ScanRange::AllocateScanRange(shared_state_->obj_pool(), fi, len, offset, + move(sub_ranges), metadata, disk_id, expected_local, buffer_opts); } const CodegenFnPtrBase* HdfsScanNodeBase::GetCodegenFn(THdfsFileFormat::type type) { @@ -1213,6 +1216,7 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { bytes_read_local_->Set(reader_context_->bytes_read_local()); bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit()); bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache()); + bytes_read_ec_->Set(reader_context_->bytes_read_ec()); num_remote_ranges_->Set(reader_context_->num_remote_ranges()); unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes()); cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count()); @@ -1237,6 +1241,8 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { bytes_read_short_circuit_->value()); ImpaladMetrics::IO_MGR_CACHED_BYTES_READ->Increment( bytes_read_dn_cache_->value()); + ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ->Increment( + bytes_read_ec_->value()); } } diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 21d18f25c..053e61c70 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -71,6 +71,10 @@ struct HdfsFileDesc { file_compression(THdfsCompression::NONE), file_format(THdfsFileFormat::TEXT) {} + io::ScanRange::FileInfo GetFileInfo() const { + return io::ScanRange::FileInfo{filename.c_str(), fs, mtime, is_erasure_coded}; + } + /// Connection to the filesystem containing the file. hdfsFS fs; @@ -93,6 +97,9 @@ struct HdfsFileDesc { /// Extra file metadata, e.g. Iceberg-related file-level info. const ::org::apache::impala::fb::FbFileMetadata* file_metadata; + /// Whether file is erasure coded. + bool is_erasure_coded = false; + /// Some useful typedefs for creating HdfsFileDesc related data structures. /// This is a pair for partition ID and filename which uniquely identifies a file. typedef pair<int64_t, std::string> PartitionFileKey; @@ -506,23 +513,21 @@ class HdfsScanNodeBase : public ScanNode { /// If not NULL, the 'original_split' pointer is stored for reference in the scan range /// metadata of the scan range that is to be allocated. /// This is thread safe. - io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, + io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len, int64_t offset, int64_t partition_id, int disk_id, bool expected_local, - int64_t mtime, const io::BufferOpts& buffer_opts, - const io::ScanRange* original_split = nullptr); + const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr); /// Same as the first overload, but it takes sub-ranges as well. - io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, + io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len, int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges, int64_t partition_id, int disk_id, bool expected_local, - int64_t mtime, const io::BufferOpts& buffer_opts, - const io::ScanRange* original_split = nullptr); + const io::BufferOpts& buffer_opts, const io::ScanRange* original_split = nullptr); /// Same as above, but it takes both sub-ranges and metadata. - io::ScanRange* AllocateScanRange(hdfsFS fs, const char* file, int64_t len, + io::ScanRange* AllocateScanRange(const io::ScanRange::FileInfo &fi, int64_t len, int64_t offset, std::vector<io::ScanRange::SubRange>&& sub_ranges, ScanRangeMetadata* metadata, int disk_id, bool expected_local, - int64_t mtime, const io::BufferOpts& buffer_opts); + const io::BufferOpts& buffer_opts); /// Adds ranges to be read later by scanners. Must not be called once /// remaining_scan_range_submissions_ is 0. The enqueue_location specifies whether the @@ -781,6 +786,7 @@ class HdfsScanNodeBase : public ScanNode { RuntimeProfile::Counter* bytes_read_local_ = nullptr; RuntimeProfile::Counter* bytes_read_short_circuit_ = nullptr; RuntimeProfile::Counter* bytes_read_dn_cache_ = nullptr; + RuntimeProfile::Counter* bytes_read_ec_ = nullptr; RuntimeProfile::Counter* num_remote_ranges_ = nullptr; RuntimeProfile::Counter* unexpected_remote_bytes_ = nullptr; RuntimeProfile::Counter* cached_file_handles_hit_count_ = nullptr; diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 7795a8c09..49f360ed1 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -888,10 +888,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node, // metadata associated with the footer range. ScanRange* footer_range; if (footer_split != nullptr) { - footer_range = scan_node->AllocateScanRange(files[i]->fs, - files[i]->filename.c_str(), footer_size, footer_start, - split_metadata->partition_id, footer_split->disk_id(), - footer_split->expected_local(), files[i]->mtime, + footer_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(), + footer_size, footer_start, split_metadata->partition_id, + footer_split->disk_id(), footer_split->expected_local(), BufferOpts(footer_split->cache_options()), split); } else { // If we did not find the last split, we know it is going to be a remote read. @@ -899,9 +898,9 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node, int cache_options = !scan_node->IsDataCacheDisabled() ? BufferOpts::USE_DATA_CACHE : BufferOpts::NO_CACHING; footer_range = - scan_node->AllocateScanRange(files[i]->fs, files[i]->filename.c_str(), - footer_size, footer_start, split_metadata->partition_id, -1, - expected_local, files[i]->mtime, BufferOpts(cache_options), split); + scan_node->AllocateScanRange(files[i]->GetFileInfo(), + footer_size, footer_start, split_metadata->partition_id, -1, + expected_local, BufferOpts(cache_options), split); } footer_ranges.push_back(footer_range); } else { diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc index ce04bf178..12185af5b 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.cc +++ b/be/src/exec/orc/hdfs-orc-scanner.cc @@ -155,8 +155,9 @@ Status HdfsOrcScanner::ScanRangeInputStream::readRandom( bool expected_local = split_range->ExpectedLocalRead(offset, length); int cache_options = split_range->cache_options() & ~BufferOpts::USE_HDFS_CACHE; ScanRange* range = scanner_->scan_node_->AllocateScanRange( - metadata_range->fs(), scanner_->filename(), length, offset, partition_id, - split_range->disk_id(), expected_local, split_range->mtime(), + ScanRange::FileInfo{scanner_->filename(), metadata_range->fs(), + split_range->mtime(), split_range->is_erasure_coded()}, + length, offset, partition_id, split_range->disk_id(), expected_local, BufferOpts::ReadInto(reinterpret_cast<uint8_t*>(buf), length, cache_options)); unique_ptr<BufferDescriptor> io_buffer; Status status; @@ -255,9 +256,11 @@ Status HdfsOrcScanner::StartColumnReading(const orc::StripeInformation& stripe) string msg = Substitute("Invalid read len."); return Status(msg); } - ScanRange* scan_range = scan_node_->AllocateScanRange(metadata_range_->fs(), - filename(), range.length_, range.offset_, partition_id, split_range->disk_id(), - col_range_local, split_range->mtime(), BufferOpts(split_range->cache_options())); + ScanRange* scan_range = scan_node_->AllocateScanRange( + ScanRange::FileInfo{filename(), metadata_range_->fs(), split_range->mtime(), + split_range->is_erasure_coded()}, + range.length_, range.offset_, partition_id, split_range->disk_id(), + col_range_local, BufferOpts(split_range->cache_options())); RETURN_IF_ERROR( context_->AddAndStartStream(scan_range, range.io_reservation, &range.stream_)); } diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index 64e4f546e..df70ceb2e 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -1913,10 +1913,8 @@ Status HdfsParquetScanner::ReadToBuffer(uint64_t offset, uint8_t* buffer, uint64 const int cache_options = metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE; ScanRange* object_range = scan_node_->AllocateScanRange( - metadata_range_->fs(), filename(), size, - offset, partition_id, + metadata_range_->GetFileInfo(), size, offset, partition_id, metadata_range_->disk_id(), metadata_range_->expected_local(), - metadata_range_->mtime(), BufferOpts::ReadInto(buffer, size, cache_options)); unique_ptr<BufferDescriptor> io_buffer; bool needs_buffers; diff --git a/be/src/exec/parquet/parquet-page-index.cc b/be/src/exec/parquet/parquet-page-index.cc index e46edfa65..9925714ac 100644 --- a/be/src/exec/parquet/parquet-page-index.cc +++ b/be/src/exec/parquet/parquet-page-index.cc @@ -96,12 +96,10 @@ Status ParquetPageIndex::ReadAll(int row_group_idx) { int cache_options = scanner_->metadata_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE; ScanRange* object_range = scanner_->scan_node_->AllocateScanRange( - scanner_->metadata_range_->fs(), scanner_->filename(), scan_range_size, - scan_range_start, move(sub_ranges), partition_id, - scanner_->metadata_range_->disk_id(), scanner_->metadata_range_->expected_local(), - scanner_->metadata_range_->mtime(), - BufferOpts::ReadInto(page_index_buffer_.buffer(), page_index_buffer_.Size(), - cache_options)); + scanner_->metadata_range_->GetFileInfo(), scan_range_size, scan_range_start, + move(sub_ranges), partition_id, scanner_->metadata_range_->disk_id(), + scanner_->metadata_range_->expected_local(), BufferOpts::ReadInto( + page_index_buffer_.buffer(), page_index_buffer_.Size(), cache_options)); unique_ptr<BufferDescriptor> io_buffer; bool needs_buffers; diff --git a/be/src/exec/parquet/parquet-page-reader.cc b/be/src/exec/parquet/parquet-page-reader.cc index 972b326aa..3caa3ebda 100644 --- a/be/src/exec/parquet/parquet-page-reader.cc +++ b/be/src/exec/parquet/parquet-page-reader.cc @@ -87,10 +87,11 @@ Status ParquetPageReader::InitColumnChunk(const HdfsFileDesc& file_desc, static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split; // Determine if the column is completely contained within a local split. bool col_range_local = split_range->ExpectedLocalRead(col_start, col_len); - scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(), - filename(), col_len, col_start, move(sub_ranges), - partition_id, split_range->disk_id(), - col_range_local, file_desc.mtime, BufferOpts(split_range->cache_options())); + ScanRange::FileInfo fi = metadata_range->GetFileInfo(); + fi.mtime = file_desc.mtime; + scan_range_ = parent_->scan_node_->AllocateScanRange(fi, + col_len, col_start, move(sub_ranges), partition_id, split_range->disk_id(), + col_range_local, BufferOpts(split_range->cache_options())); page_headers_read_ = 0; dictionary_header_encountered_ = false; state_ = State::Initialized; diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc index a9ac2061d..2e453dc96 100644 --- a/be/src/exec/scanner-context.cc +++ b/be/src/exec/scanner-context.cc @@ -172,9 +172,8 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) { // Disable HDFS caching as we are reading past the end. int cache_options = scan_range_->cache_options() & ~BufferOpts::USE_HDFS_CACHE; ScanRange* range = parent_->scan_node_->AllocateScanRange( - scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id, - scan_range_->disk_id(), expected_local, scan_range_->mtime(), - BufferOpts(cache_options)); + scan_range_->GetFileInfo(), read_past_buffer_size, offset, partition_id, + scan_range_->disk_id(), expected_local, BufferOpts(cache_options)); bool needs_buffers; RETURN_IF_ERROR( parent_->scan_node_->reader_context()->StartScanRange(range, &needs_buffers)); diff --git a/be/src/exec/text/hdfs-text-scanner.cc b/be/src/exec/text/hdfs-text-scanner.cc index f36b21c67..0aae4faaf 100644 --- a/be/src/exec/text/hdfs-text-scanner.cc +++ b/be/src/exec/text/hdfs-text-scanner.cc @@ -145,10 +145,9 @@ Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, DCHECK_GT(files[i]->file_length, 0); ScanRangeMetadata* metadata = static_cast<ScanRangeMetadata*>(split->meta_data()); - ScanRange* file_range = scan_node->AllocateScanRange(files[i]->fs, - files[i]->filename.c_str(), files[i]->file_length, 0, - metadata->partition_id, split->disk_id(), split->expected_local(), - files[i]->mtime, BufferOpts(split->cache_options())); + ScanRange* file_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(), + files[i]->file_length, 0, metadata->partition_id, split->disk_id(), + split->expected_local(), BufferOpts(split->cache_options())); compressed_text_scan_ranges.push_back(file_range); scan_node->max_compressed_text_file_length()->Set(files[i]->file_length); } diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc index afe65e39d..173899edc 100644 --- a/be/src/runtime/io/disk-io-mgr-stress.cc +++ b/be/src/runtime/io/disk-io-mgr-stress.cc @@ -261,8 +261,8 @@ void DiskIoMgrStress::NewClient(int i) { range_len = min(range_len, file_len - assigned_len); ScanRange* range = client.obj_pool.Add(new ScanRange); - range->Reset(NULL, files_[client.file_idx].filename.c_str(), range_len, assigned_len, - 0, false, ScanRange::INVALID_MTIME, BufferOpts::Uncached()); + range->Reset(ScanRange::FileInfo{files_[client.file_idx].filename.c_str()}, + range_len, assigned_len, 0, false, BufferOpts::Uncached()); client.scan_ranges.push_back(range); assigned_len += range_len; } diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc index 52741f0f8..1e5f9240b 100644 --- a/be/src/runtime/io/disk-io-mgr-test.cc +++ b/be/src/runtime/io/disk-io-mgr-test.cc @@ -131,8 +131,8 @@ class DiskIoMgrTest : public testing::Test { } if (status.ok()) { ScanRange* scan_range = pool_.Add(new ScanRange()); - scan_range->Reset(nullptr, (*written_range)->file(), (*written_range)->len(), - (*written_range)->offset(), 0, false, ScanRange::INVALID_MTIME, + scan_range->Reset(ScanRange::FileInfo{(*written_range)->file()}, + (*written_range)->len(), (*written_range)->offset(), 0, false, BufferOpts::Uncached()); ValidateSyncRead(io_mgr, reader, client, scan_range, reinterpret_cast<const char*>(data), sizeof(int32_t)); @@ -300,8 +300,8 @@ class DiskIoMgrTest : public testing::Test { ScanRange* range = pool->Add(new ScanRange); int cache_options = is_hdfs_cached ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING; - range->Reset(nullptr, file_path, len, offset, disk_id, true, mtime, - BufferOpts(cache_options), move(sub_ranges), meta_data); + range->Reset(ScanRange::FileInfo{file_path, nullptr, mtime}, len, offset, disk_id, + true, BufferOpts(cache_options), move(sub_ranges), meta_data); EXPECT_EQ(mtime, range->mtime()); return range; } @@ -1591,7 +1591,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) { vector<uint8_t> client_buffer(buffer_len); int scan_len = min(len, buffer_len); ScanRange* range = pool_.Add(new ScanRange); - range->Reset(nullptr, tmp_file, scan_len, 0, 0, true, ScanRange::INVALID_MTIME, + range->Reset(ScanRange::FileInfo{tmp_file}, scan_len, 0, 0, true, BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING)); bool needs_buffers; ASSERT_OK(reader->StartScanRange(range, &needs_buffers)); @@ -1638,8 +1638,8 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferSubRanges) { vector<uint8_t> client_buffer(result_len); ScanRange* range = pool_.Add(new ScanRange); int cache_options = fake_cache ? BufferOpts::USE_HDFS_CACHE : BufferOpts::NO_CACHING; - range->Reset(nullptr, tmp_file, data_len, 0, 0, true, stat_val.st_mtime, - BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len), + range->Reset(ScanRange::FileInfo{tmp_file, nullptr, stat_val.st_mtime}, data_len, 0, + 0, true, BufferOpts::ReadInto(cache_options, client_buffer.data(), result_len), move(sub_ranges)); if (fake_cache) { SetReaderStub(range, make_unique<CacheReaderTestStub>(range, cache, data_len)); @@ -1688,7 +1688,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) { LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client); unique_ptr<RequestContext> reader = io_mgr->RegisterContext(); ScanRange* range = pool_.Add(new ScanRange); - range->Reset(nullptr, tmp_file, SCAN_LEN, 0, 0, true, ScanRange::INVALID_MTIME, + range->Reset(ScanRange::FileInfo{tmp_file}, SCAN_LEN, 0, 0, true, BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN, BufferOpts::NO_CACHING)); bool needs_buffers; ASSERT_OK(reader->StartScanRange(range, &needs_buffers)); @@ -2059,8 +2059,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) { auto data = datas.at(i); size_t buffer_len = sizeof(int32_t); vector<uint8_t> client_buffer(buffer_len); - scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(), - range->offset(), 0, false, mtime, + scan_range->Reset( + ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime}, + range->len(), range->offset(), 0, false, BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING), nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile()); bool needs_buffers; @@ -2085,8 +2086,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteSuccess) { auto data = datas.at(i); size_t buffer_len = sizeof(int32_t); vector<uint8_t> client_buffer(buffer_len); - scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(), - range->offset(), 0, false, mtime, + scan_range->Reset( + ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), mtime}, + range->len(), range->offset(), 0, false, BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING), nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile()); bool needs_buffers; @@ -2169,8 +2171,9 @@ TEST_F(DiskIoMgrTest, WriteToRemotePartialFileSuccess) { ScanRange* scan_range = tmp_pool.Add(new ScanRange); size_t buffer_len = sizeof(int32_t); vector<uint8_t> client_buffer(buffer_len); - scan_range->Reset(hdfsConnect("default", 0), (*new_range)->file(), (*new_range)->len(), - (*new_range)->offset(), 0, false, mtime, + scan_range->Reset( + ScanRange::FileInfo{(*new_range)->file(), hdfsConnect("default", 0), mtime}, + (*new_range)->len(), (*new_range)->offset(), 0, false, BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING), nullptr, (*new_tmp_file_obj)->DiskFile(), (*new_tmp_file_obj)->DiskBufferFile()); bool needs_buffers; @@ -2588,8 +2591,9 @@ TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) { auto range = ranges.at(0); size_t buffer_len = sizeof(int32_t); vector<uint8_t> client_buffer(buffer_len); - scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(), - range->offset(), 0, false, 1000000, + scan_range->Reset( + ScanRange::FileInfo{range->file(), hdfsConnect("default", 0), 1000000}, + range->len(), range->offset(), 0, false, BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING), nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile()); bool needs_buffers; diff --git a/be/src/runtime/io/hdfs-file-reader.cc b/be/src/runtime/io/hdfs-file-reader.cc index 0b98441a2..9dfef6aae 100644 --- a/be/src/runtime/io/hdfs-file-reader.cc +++ b/be/src/runtime/io/hdfs-file-reader.cc @@ -412,6 +412,9 @@ void HdfsFileReader::GetHdfsStatistics(hdfsFile hdfs_file, bool log_stats) { scan_range_->reader_->bytes_read_short_circuit_.Add( stats->totalShortCircuitBytesRead); scan_range_->reader_->bytes_read_dn_cache_.Add(stats->totalZeroCopyBytesRead); + if (scan_range_->is_erasure_coded()) { + scan_range_->reader_->bytes_read_ec_.Add(stats->totalBytesRead); + } if (stats->totalLocalBytesRead != stats->totalBytesRead) { num_remote_bytes_ += stats->totalBytesRead - stats->totalLocalBytesRead; } diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h index 96d609d93..646908374 100644 --- a/be/src/runtime/io/request-context.h +++ b/be/src/runtime/io/request-context.h @@ -158,6 +158,7 @@ class RequestContext { int64_t bytes_read_local() const { return bytes_read_local_.Load(); } int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); } int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); } + int64_t bytes_read_ec() const { return bytes_read_ec_.Load(); } int num_remote_ranges() const { return num_remote_ranges_.Load(); } int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); } @@ -399,6 +400,9 @@ class RequestContext { /// Total number of bytes read from date node cache, updated at end of each range scan AtomicInt64 bytes_read_dn_cache_{0}; + /// Total number of erasure-coded bytes read, updated at end of each range scan + AtomicInt64 bytes_read_ec_{0}; + /// Total number of bytes from remote reads that were expected to be local. AtomicInt64 unexpected_remote_bytes_{0}; diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h index ff3970e49..5121d6acc 100644 --- a/be/src/runtime/io/request-ranges.h +++ b/be/src/runtime/io/request-ranges.h @@ -146,6 +146,7 @@ class RequestRange : public InternalQueue<RequestRange>::Node { int64_t offset() const { return offset_; } int64_t len() const { return len_; } int disk_id() const { return disk_id_; } + bool is_erasure_coded() const { return is_erasure_coded_; } RequestType::type request_type() const { return request_type_; } protected: @@ -171,6 +172,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node { /// Id of disk queue containing byte range. int disk_id_; + /// Whether file is erasure coded. + bool is_erasure_coded_; + /// The type of IO request, READ or WRITE. RequestType::type request_type_; }; @@ -261,11 +265,26 @@ class ScanRange : public RequestRange { int64_t length; }; + /// Struct for passing file info for constructing ScanRanges. Only contains details + /// consistent across all ranges for a given file. Filename is only used for the + /// duration of calls accepting FileInfo. + struct FileInfo { + const char *filename; + hdfsFS fs = nullptr; + int64_t mtime = ScanRange::INVALID_MTIME; + bool is_erasure_coded = false; + }; + /// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it /// with the rest of the input variables. - static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file, + static ScanRange* AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi, int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata, - int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts); + int disk_id, bool expected_local, const BufferOpts& buffer_opts); + + /// Get file info for the current scan range. + FileInfo GetFileInfo() const { + return FileInfo{file_.c_str(), fs_, mtime_, is_erasure_coded_}; + } /// Resets this scan range object with the scan range description. The scan range /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the @@ -285,15 +304,14 @@ class ScanRange : public RequestRange { /// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently /// it is not generally safe to do so, but some unit tests reuse ranges after /// successfully reading to eos. - void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, - bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, - void* meta_data = nullptr, DiskFile* disk_file = nullptr, - DiskFile* disk_buffer_file = nullptr); + void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr, + DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr); /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges /// in advance, as this method will do the merge. - void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, - bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, + void Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr, DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr); diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc index 1851f3a1b..57d67bbfa 100644 --- a/be/src/runtime/io/scan-range.cc +++ b/be/src/runtime/io/scan-range.cc @@ -466,57 +466,57 @@ ScanRange::~ScanRange() { DCHECK(!read_in_flight_); } -void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, - int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, - void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) { - Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, {}, meta_data, - disk_file, disk_buffer_file); +void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, void* meta_data, + DiskFile* disk_file, DiskFile* disk_buffer_file) { + Reset(fi, len, offset, disk_id, expected_local, buffer_opts, {}, + meta_data, disk_file, disk_buffer_file); } -ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file, +ScanRange* ScanRange::AllocateScanRange(ObjectPool* obj_pool, const FileInfo &fi, int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata, - int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts) { + int disk_id, bool expected_local, const BufferOpts& buffer_opts) { DCHECK_GE(disk_id, -1); DCHECK_GE(offset, 0); DCHECK_GE(len, 0); disk_id = ExecEnv::GetInstance()->disk_io_mgr()->AssignQueue( - file, disk_id, expected_local, /* check_default_fs */ true); + fi.filename, disk_id, expected_local, /* check_default_fs */ true); ScanRange* range = obj_pool->Add(new ScanRange); - range->Reset(fs, file, len, offset, disk_id, expected_local, mtime, buffer_opts, + range->Reset(fi, len, offset, disk_id, expected_local, buffer_opts, move(sub_ranges), metadata); return range; } -void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, - int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, - vector<SubRange>&& sub_ranges, void* meta_data, DiskFile* disk_file, - DiskFile* disk_buffer_file) { +void ScanRange::Reset(const FileInfo &fi, int64_t len, int64_t offset, int disk_id, + bool expected_local, const BufferOpts& buffer_opts, vector<SubRange>&& sub_ranges, + void* meta_data, DiskFile* disk_file, DiskFile* disk_buffer_file) { DCHECK(buffer_manager_->is_readybuffer_empty()); DCHECK(!read_in_flight_); - DCHECK(file != nullptr); + DCHECK(fi.filename != nullptr); DCHECK_GE(len, 0); DCHECK_GE(offset, 0); DCHECK(buffer_opts.client_buffer_ == nullptr || buffer_opts.client_buffer_len_ >= len_); - fs_ = fs; - if (fs != nullptr) { + fs_ = fi.fs; + if (fs_ != nullptr) { file_reader_ = make_unique<HdfsFileReader>(this, fs_, false); local_buffer_reader_ = make_unique<LocalFileReader>(this); } else { file_reader_ = make_unique<LocalFileReader>(this); } - file_ = file; + file_ = fi.filename; len_ = len; bytes_to_read_ = len; offset_ = offset; disk_id_ = disk_id; + is_erasure_coded_ = fi.is_erasure_coded; cache_options_ = buffer_opts.cache_options_; disk_file_ = disk_file; disk_buffer_file_ = disk_buffer_file; // HDFS ranges must have an mtime > 0. Local ranges do not use mtime. - if (fs_) DCHECK_GT(mtime, 0); - mtime_ = mtime; + mtime_ = fi.mtime; + if (fs_) DCHECK_GT(mtime_, 0); meta_data_ = meta_data; if (buffer_opts.client_buffer_ != nullptr) { buffer_manager_->set_client_buffer(); diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc index be84ca420..8a90cf614 100644 --- a/be/src/runtime/tmp-file-mgr.cc +++ b/be/src/runtime/tmp-file-mgr.cc @@ -1629,16 +1629,18 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) { DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset); DiskFile* remote_file = tmp_file->DiskFile(); // Reset the read_range, use the remote filesystem's disk id. - handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(), - handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_, + handle->read_range_->Reset( + ScanRange::FileInfo{ + remote_file->path().c_str(), tmp_file->hdfs_conn_, tmp_file->mtime_}, + handle->write_range_->len(), offset, tmp_file->disk_id(), false, BufferOpts::ReadInto( read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING), nullptr, remote_file, local_read_buffer_file); } else { // Read from local. - handle->read_range_->Reset(nullptr, handle->write_range_->file(), + handle->read_range_->Reset( + ScanRange::FileInfo{handle->write_range_->file()}, handle->write_range_->len(), offset, handle->write_range_->disk_id(), false, - ScanRange::INVALID_MTIME, BufferOpts::ReadInto( read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING)); } diff --git a/be/src/scheduling/scheduler.cc b/be/src/scheduling/scheduler.cc index 5afaf73c6..67ea5414c 100644 --- a/be/src/scheduling/scheduler.cc +++ b/be/src/scheduling/scheduler.cc @@ -134,6 +134,7 @@ Status Scheduler::GenerateScanRanges(const vector<TFileSplitGeneratorSpec>& spec hdfs_scan_range.__set_offset(scan_range_offset); hdfs_scan_range.__set_partition_id(spec.partition_id); hdfs_scan_range.__set_partition_path_hash(spec.partition_path_hash); + hdfs_scan_range.__set_is_erasure_coded(fb_desc->is_ec()); if (fb_desc->absolute_path() != nullptr) { hdfs_scan_range.__set_absolute_path(fb_desc->absolute_path()->str()); } @@ -1123,6 +1124,7 @@ void TScanRangeToScanRangePB(const TScanRange& tscan_range, ScanRangePB* scan_ra hdfs_file_split->set_mtime(tscan_range.hdfs_file_split.mtime); hdfs_file_split->set_partition_path_hash( tscan_range.hdfs_file_split.partition_path_hash); + hdfs_file_split->set_is_erasure_coded(tscan_range.hdfs_file_split.is_erasure_coded); if (tscan_range.hdfs_file_split.__isset.absolute_path) { hdfs_file_split->set_absolute_path( tscan_range.hdfs_file_split.absolute_path); diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc index e57062fcc..6a2e07402 100644 --- a/be/src/util/impalad-metrics.cc +++ b/be/src/util/impalad-metrics.cc @@ -59,6 +59,8 @@ const char* ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ = "impala-server.io-mgr.short-circuit-bytes-read"; const char* ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ = "impala-server.io-mgr.cached-bytes-read"; +const char* ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ = + "impala-server.io-mgr.erasure-coded-bytes-read"; const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = "impala-server.io-mgr.remote-data-cache-hit-bytes"; const char* ImpaladMetricKeys::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = @@ -167,6 +169,7 @@ IntCounter* ImpaladMetrics::IO_MGR_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_LOCAL_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_SHORT_CIRCUIT_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_CACHED_BYTES_READ = nullptr; +IntCounter* ImpaladMetrics::IO_MGR_ERASURE_CODED_BYTES_READ = nullptr; IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES = nullptr; IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT = nullptr; IntCounter* ImpaladMetrics::IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES = nullptr; @@ -341,6 +344,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) { ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ, 0); IO_MGR_CACHED_BYTES_READ = IO_MGR_METRICS->AddCounter( ImpaladMetricKeys::IO_MGR_CACHED_BYTES_READ, 0); + IO_MGR_ERASURE_CODED_BYTES_READ = IO_MGR_METRICS->AddCounter( + ImpaladMetricKeys::IO_MGR_ERASURE_CODED_BYTES_READ, 0); IO_MGR_SHORT_CIRCUIT_BYTES_READ = IO_MGR_METRICS->AddCounter( ImpaladMetricKeys::IO_MGR_SHORT_CIRCUIT_BYTES_READ, 0); IO_MGR_BYTES_WRITTEN = IO_MGR_METRICS->AddCounter( diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h index a3811f7b3..c43dca69f 100644 --- a/be/src/util/impalad-metrics.h +++ b/be/src/util/impalad-metrics.h @@ -76,6 +76,9 @@ class ImpaladMetricKeys { /// Total number of cached bytes read by the io mgr static const char* IO_MGR_CACHED_BYTES_READ; + /// Total number of erasure-coded bytes read by the io mgr + static const char* IO_MGR_ERASURE_CODED_BYTES_READ; + /// Total number of bytes read from the remote data cache. static const char* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES; @@ -258,6 +261,7 @@ class ImpaladMetrics { static IntCounter* IO_MGR_BYTES_READ; static IntCounter* IO_MGR_LOCAL_BYTES_READ; static IntCounter* IO_MGR_CACHED_BYTES_READ; + static IntCounter* IO_MGR_ERASURE_CODED_BYTES_READ; static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_BYTES; static IntCounter* IO_MGR_REMOTE_DATA_CACHE_HIT_COUNT; static IntCounter* IO_MGR_REMOTE_DATA_CACHE_MISS_BYTES; diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 46e8a5a30..d76a25fa6 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -227,6 +227,9 @@ struct THdfsFileSplit { // The absolute path of the file, it's used only when data files are outside of // the Iceberg table location (IMPALA-11507). 10: optional string absolute_path + + // Whether the HDFS file is stored with erasure coding. + 11: optional bool is_erasure_coded } // Key range for single THBaseScanNode. Corresponds to HBaseKeyRangePB and should be kept diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json index a5abd6651..6e5bca495 100644 --- a/common/thrift/metrics.json +++ b/common/thrift/metrics.json @@ -409,6 +409,16 @@ "kind": "COUNTER", "key": "impala-server.io-mgr.cached-bytes-read" }, + { + "description": "Total number of erasure-coded bytes read by the IO manager.", + "contexts": [ + "IMPALAD" + ], + "label": "Impala Server Io Mgr Erasure Coded Bytes Read", + "units": "BYTES", + "kind": "COUNTER", + "key": "impala-server.io-mgr.erasure-coded-bytes-read" + }, { "description": "Total number of local bytes read by the IO manager.", "contexts": [ diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 1958595ad..35cdddc4c 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -1424,6 +1424,7 @@ public class HdfsScanNode extends ScanNode { fileDesc.getFileCompression().toThrift(), fileDesc.getModificationTime(), partition.getLocation().hashCode()); hdfsFileSplit.setAbsolute_path(fileDesc.getAbsolutePath()); + hdfsFileSplit.setIs_erasure_coded(fileDesc.getIsEc()); scanRange.setHdfs_file_split(hdfsFileSplit); if (fileDesc.getFbFileMetadata() != null) { scanRange.setFile_metadata(fileDesc.getFbFileMetadata().getByteBuffer()); diff --git a/tests/query_test/test_io_metrics.py b/tests/query_test/test_io_metrics.py new file mode 100644 index 000000000..f4e4d1ef4 --- /dev/null +++ b/tests/query_test/test_io_metrics.py @@ -0,0 +1,79 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest + +from tests.common.environ import IS_DOCKERIZED_TEST_CLUSTER +from tests.common.impala_test_suite import ImpalaTestSuite, LOG +from tests.common.test_dimensions import create_single_exec_option_dimension +from tests.util.filesystem_utils import IS_EC, IS_HDFS + + +class TestIOMetrics(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'tpch' + + @classmethod + def add_test_dimensions(cls): + super(TestIOMetrics, cls).add_test_dimensions() + # Run with num_nodes=1 to make it easy to verify metric changes. + cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension(num_nodes=1)) + + # Issue a local query and test that read metrics are updated. + @pytest.mark.execute_serially + def test_local_read(self, vector): + # Accumulate metrics that are expected to update from a read, and metrics that are + # expected not to change with this configuration. Metrics that shouldn't change for + # this test should be 0 throughout the whole test suite so we can just verify they're + # 0 after running our query. Omits cached-bytes-read because it has its own test. + expect_nonzero_metrics = ["impala-server.io-mgr.bytes-read"] + expect_zero_metrics = [] + + def append_metric(metric, expect_nonzero): + (expect_nonzero_metrics if expect_nonzero else expect_zero_metrics).append(metric) + + append_metric("impala-server.io-mgr.erasure-coded-bytes-read", IS_EC) + append_metric("impala-server.io-mgr.short-circuit-bytes-read", + IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER) + # TODO: this should be updated for Ozone, but the code that updates it is guarded by + # IsHdfsPath and adding Ozone causes a crash. Plan to debug in IMPALA-11697. + append_metric("impala-server.io-mgr.local-bytes-read", + IS_HDFS and not IS_DOCKERIZED_TEST_CLUSTER) + + nonzero_before = self.impalad_test_service.get_metric_values(expect_nonzero_metrics) + + result = self.execute_query("select count(*) from tpch.nation") + assert(len(result.data) == 1) + assert(result.data[0] == '25') + nation_data_file_length = 2199 + + nonzero_after = self.impalad_test_service.get_metric_values(expect_nonzero_metrics) + + zero_values = self.impalad_test_service.get_metric_values(expect_zero_metrics) + assert(len(expect_zero_metrics) == len(zero_values)) + LOG.info("Verifying %s expect-zero metrics.", len(expect_zero_metrics)) + for metric, value in zip(expect_zero_metrics, zero_values): + LOG.info("%s: %s", metric, value) + assert(value == 0) + + assert(len(expect_nonzero_metrics) == len(nonzero_before) == len(nonzero_after)) + LOG.info("Verifying %s expect-non-zero metrics.", len(expect_nonzero_metrics)) + for metric, before, after in \ + zip(expect_nonzero_metrics, nonzero_before, nonzero_after): + LOG.info("%s: %s -> %s", metric, before, after) + assert(before + nation_data_file_length == after)
