IMPALA-4863/IMPALA-5311: Correctly account the file type and compression codec
If a scan range is skipped at runtime the scan node skips reading the range and never figures out the underlying compression codec used to compress the files. In such a scenario we default the compression codec to NONE which can be misleading. This change marks these files as filtered in the scan node profile e.g. - File Formats: TEXT/NONE:364 TEXT/NONE(Skipped):1460 Change-Id: I797916505f62e568f4159e07099481b8ff571da2 Reviewed-on: http://gerrit.cloudera.org:8080/7245 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Tim Armstrong <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f87da848 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f87da848 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f87da848 Branch: refs/heads/master Commit: f87da848f5f204ae0dc84ffd9de64007e197c4d9 Parents: fa93a47 Author: aphadke <[email protected]> Authored: Mon Jun 19 16:34:57 2017 -0700 Committer: Tim Armstrong <[email protected]> Committed: Thu Sep 21 17:38:08 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 9 ++++-- be/src/exec/hdfs-scan-node-base.cc | 28 ++++++++++++++---- be/src/exec/hdfs-scan-node-base.h | 12 ++++++-- be/src/exec/hdfs-scan-node.cc | 4 +-- be/src/exec/hdfs-scan-node.h | 2 +- .../queries/QueryTest/hdfs_scanner_profile.test | 30 ++++++++++++++++++-- tests/query_test/test_scanners.py | 6 ++-- 7 files changed, 71 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 4cd4340..57f1e24 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -315,9 +315,12 @@ void HdfsParquetScanner::Close(RowBatch* row_batch) { assemble_rows_timer_.ReleaseCounter(); // If this was a metadata only read (i.e. count(*)), there are no columns. - if (compression_types.empty()) compression_types.push_back(THdfsCompression::NONE); - scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types); - + if (compression_types.empty()) { + compression_types.push_back(THdfsCompression::NONE); + scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types, true); + } else { + scan_node_->RangeComplete(THdfsFileFormat::PARQUET, compression_types); + } if (schema_resolver_.get() != nullptr) schema_resolver_.reset(); ScalarExprEvaluator::Close(min_max_conjunct_evals_, state_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index b5169a8..e74efcd 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -553,7 +553,7 @@ bool HdfsScanNodeBase::FilePassesFilterPredicates( filter_ctxs)) { for (int j = 0; j < file->splits.size(); ++j) { // Mark range as complete to ensure progress. - RangeComplete(format, file->file_compression); + RangeComplete(format, file->file_compression, true); } return false; } @@ -775,18 +775,18 @@ bool HdfsScanNodeBase::PartitionPassesFilters(int32_t partition_id, } void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, - const THdfsCompression::type& compression_type) { + const THdfsCompression::type& compression_type, bool skipped) { vector<THdfsCompression::type> types; types.push_back(compression_type); - RangeComplete(file_type, types); + RangeComplete(file_type, types, skipped); } void HdfsScanNodeBase::RangeComplete(const THdfsFileFormat::type& file_type, - const vector<THdfsCompression::type>& compression_types) { + const vector<THdfsCompression::type>& compression_types, bool skipped) { scan_ranges_complete_counter()->Add(1); progress_.Update(1); for (int i = 0; i < compression_types.size(); ++i) { - ++file_type_counts_[make_pair(file_type, compression_types[i])]; + ++file_type_counts_[std::make_tuple(file_type, skipped, compression_types[i])]; } } @@ -871,7 +871,23 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { { for (FileTypeCountsMap::const_iterator it = file_type_counts_.begin(); it != file_type_counts_.end(); ++it) { - ss << it->first.first << "/" << it->first.second << ":" << it->second << " "; + + THdfsFileFormat::type file_format = std::get<0>(it->first); + bool skipped = std::get<1>(it->first); + THdfsCompression::type compression_type = std::get<2>(it->first); + + if (skipped) { + if (file_format == THdfsFileFormat::PARQUET) { + // If a scan range stored as parquet is skipped, its compression type + // cannot be figured out without reading the data. + ss << file_format << "/" << "Unknown" << "(Skipped):" << it->second << " "; + } else { + ss << file_format << "/" << compression_type << "(Skipped):" + << it->second << " "; + } + } else { + ss << file_format << "/" << compression_type << ":" << it->second << " "; + } } } runtime_profile_->AddInfoString("File Formats", ss.str()); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index e33de5a..7e9d322 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -23,6 +23,7 @@ #include <memory> #include <unordered_set> #include <vector> +#include <tuple> #include <boost/unordered_map.hpp> #include <boost/scoped_ptr.hpp> @@ -252,11 +253,15 @@ class HdfsScanNodeBase : public ScanNode { /// Otherwise, scan nodes using a RowBatch queue may lose the last batch due /// to racing with shutting down the queue. void RangeComplete(const THdfsFileFormat::type& file_type, - const THdfsCompression::type& compression_type); + const THdfsCompression::type& compression_type, bool skipped = false); + /// Same as above except for when multiple compression codecs were used /// in the file. The metrics are incremented for each compression_type. + /// 'skipped' is set to true in the following cases - + /// 1. when a scan range is filtered at runtime + /// 2. scan range is a metadata read only(e.x. count(*) on parquet files) virtual void RangeComplete(const THdfsFileFormat::type& file_type, - const std::vector<THdfsCompression::type>& compression_type); + const std::vector<THdfsCompression::type>& compression_type, bool skipped = false); /// Utility function to compute the order in which to materialize slots to allow for /// computing conjuncts as slots get materialized (on partial tuples). @@ -492,7 +497,8 @@ class HdfsScanNodeBase : public ScanNode { /// Mapping of file formats (file type, compression type) to the number of /// splits of that type and the lock protecting it. typedef std::map< - std::pair<THdfsFileFormat::type, THdfsCompression::type>, int> FileTypeCountsMap; + std::tuple<THdfsFileFormat::type, bool, THdfsCompression::type>, + int> FileTypeCountsMap; FileTypeCountsMap file_type_counts_; /// Performs dynamic partition pruning, i.e., applies runtime filters to files, and http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 528e290..64eece3 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -242,9 +242,9 @@ void HdfsScanNode::Close(RuntimeState* state) { } void HdfsScanNode::RangeComplete(const THdfsFileFormat::type& file_type, - const std::vector<THdfsCompression::type>& compression_type) { + const std::vector<THdfsCompression::type>& compression_type, bool skipped) { lock_guard<SpinLock> l(file_type_counts_); - HdfsScanNodeBase::RangeComplete(file_type, compression_type); + HdfsScanNodeBase::RangeComplete(file_type, compression_type, skipped); } void HdfsScanNode::TransferToScanNodePool(MemPool* pool) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 18a74ad..782f530 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -93,7 +93,7 @@ class HdfsScanNode : public HdfsScanNodeBase { /// batch queue. Otherwise, we may lose the last batch due to racing with shutting down /// the RowBatch queue. virtual void RangeComplete(const THdfsFileFormat::type& file_type, - const std::vector<THdfsCompression::type>& compression_type); + const std::vector<THdfsCompression::type>& compression_type, bool skipped = false); /// Transfers all memory from 'pool' to 'scan_node_pool_'. virtual void TransferToScanNodePool(MemPool* pool); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test index ea459e4..0fe5fb3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs_scanner_profile.test @@ -1,8 +1,34 @@ -==== ----- QUERY # This query will do a full table scan to count the num of rows # read during a scan select * from alltypesagg ---- RUNTIME_PROFILE row_regex: .*RowsRead: 11.00K . ==== +---- QUERY +# This query verifies that a scan range is marked as skipped +# in the profile if the correct compression cannot be inferred +# for a scan range +select count(*) from tpcds_parquet.store_sales +---- RUNTIME_PROFILE +row_regex: .*File Formats: PARQUET/Unknown\(Skipped\):.* +==== +---- QUERY +# This query verifies that a when a parquet scan range is runtime +# filtered, it is marked as skipped and the compression codec is +# marked as unknown. +set runtime_filter_wait_time_ms=500000; +select count(*) from tpcds_parquet.store_sales +join tpcds_parquet.date_dim on +ss_sold_date_sk = d_date_sk where d_qoy=1 +---- RUNTIME_PROFILE +row_regex: .*File Formats: PARQUET/NONE:.* PARQUET/Unknown\(Skipped\).* +==== +---- QUERY +# This query verifies that a when a text scan range is runtime +# filtered, it is marked as skipped. +set runtime_filter_wait_time_ms=100000; +select count(*) from tpcds.store_sales join tpcds.date_dim on +ss_sold_date_sk = d_date_sk where d_qoy=1 +---- RUNTIME_PROFILE +row_regex: .*File Formats: TEXT/NONE:.* TEXT/NONE\(Skipped\):.* +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f87da848/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index d355081..f4f2fd6 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -73,10 +73,10 @@ class TestScannersAllTableFormats(ImpalaTestSuite): self.run_test_case('QueryTest/scanners', new_vector) def test_hdfs_scanner_profile(self, vector): - new_vector = deepcopy(vector) - new_vector.get_value('exec_option')['num_nodes'] = 1 - if new_vector.get_value('table_format').file_format in ('kudu', 'hbase'): + if vector.get_value('table_format').file_format in ('kudu', 'hbase'): pytest.skip() + new_vector = deepcopy(vector) + new_vector.get_value('exec_option')['num_nodes'] = 0 self.run_test_case('QueryTest/hdfs_scanner_profile', new_vector) # Test all the scanners with a simple limit clause. The limit clause triggers
