IMPALA-5448: fix invalid number of splits reported in Parquet scan node
Parquet splits with multi columns are marked as completed by using
HdfsScanNodeBase::RangeComplete(). It duplicately counts the file types
as column codec types. Thus the number of parquet splits are the real count
multiplies number of materialized columns.
Furthermore, according to the Parquet definition, it allows mixed compression
codecs on different columns. This's handled in this patch as well. A parquet
file
using gzip and snappy compression codec will be reported as:
FileFormats: PARQUET/(GZIP,SNAPPY):1
This patch introduces a compression types set for the above cases.
Testing:
Add end-to-end tests handling parquet files with all columns compressed in
snappy, and handling parquet files with multi compression codec.
Change-Id: Iaacc2d775032f5707061e704f12e0a63cde695d1
Reviewed-on: http://gerrit.cloudera.org:8080/8147
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/192cd96d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/192cd96d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/192cd96d
Branch: refs/heads/master
Commit: 192cd96d9ee3be1cd3e7c3ad774bf8d5c8efb1c0
Parents: adb92d3
Author: stiga-huang <[email protected]>
Authored: Tue Sep 26 16:45:11 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Tue Oct 10 01:30:33 2017 +0000
----------------------------------------------------------------------
be/src/exec/hdfs-scan-node-base.cc | 28 ++++++++--
be/src/exec/hdfs-scan-node-base.h | 56 ++++++++++++++++---
testdata/multi_compression_parquet_data/README | 8 +++
.../tinytable_0_gzip_snappy.parq | Bin 0 -> 573 bytes
.../tinytable_1_snappy_gzip.parq | Bin 0 -> 578 bytes
.../hdfs_parquet_scan_node_profile.test | 20 +++++++
tests/query_test/test_scanners.py | 33 +++++++++++
7 files changed, 132 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc
b/be/src/exec/hdfs-scan-node-base.cc
index 59d4ca2..1cfee5e 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -697,9 +697,11 @@ void HdfsScanNodeBase::RangeComplete(const
THdfsFileFormat::type& file_type,
const vector<THdfsCompression::type>& compression_types, bool skipped) {
scan_ranges_complete_counter()->Add(1);
progress_.Update(1);
+ HdfsCompressionTypesSet compression_set;
for (int i = 0; i < compression_types.size(); ++i) {
- ++file_type_counts_[std::make_tuple(file_type, skipped,
compression_types[i])];
+ compression_set.AddType(compression_types[i]);
}
+ ++file_type_counts_[std::make_tuple(file_type, skipped, compression_set)];
}
void HdfsScanNodeBase::ComputeSlotMaterializationOrder(vector<int>* order)
const {
@@ -786,19 +788,33 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
THdfsFileFormat::type file_format = std::get<0>(it->first);
bool skipped = std::get<1>(it->first);
- THdfsCompression::type compression_type = std::get<2>(it->first);
+ HdfsCompressionTypesSet compressions_set = std::get<2>(it->first);
+ int file_cnt = it->second;
if (skipped) {
if (file_format == THdfsFileFormat::PARQUET) {
// If a scan range stored as parquet is skipped, its compression
type
// cannot be figured out without reading the data.
- ss << file_format << "/" << "Unknown" << "(Skipped):" <<
it->second << " ";
+ ss << file_format << "/" << "Unknown" << "(Skipped):" << file_cnt
<< " ";
} else {
- ss << file_format << "/" << compression_type << "(Skipped):"
- << it->second << " ";
+ ss << file_format << "/" << compressions_set.GetFirstType() <<
"(Skipped):"
+ << file_cnt << " ";
}
+ } else if (compressions_set.Size() == 1) {
+ ss << file_format << "/" << compressions_set.GetFirstType() << ":"
<< file_cnt
+ << " ";
} else {
- ss << file_format << "/" << compression_type << ":" << it->second <<
" ";
+ ss << file_format << "/" << "(";
+ bool first = true;
+ for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) {
+ THdfsCompression::type type = static_cast<THdfsCompression::type>(
+ elem.first);
+ if (!compressions_set.HasType(type)) continue;
+ if (!first) ss << ",";
+ ss << type;
+ first = false;
+ }
+ ss << "):" << file_cnt << " ";
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/be/src/exec/hdfs-scan-node-base.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.h
b/be/src/exec/hdfs-scan-node-base.h
index da95ecc..252964b 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -479,13 +479,6 @@ class HdfsScanNodeBase : public ScanNode {
/// scanner threads.
Status status_;
- /// Mapping of file formats (file type, compression type) to the number of
- /// splits of that type and the lock protecting it.
- typedef std::map<
- std::tuple<THdfsFileFormat::type, bool, THdfsCompression::type>,
- int> FileTypeCountsMap;
- FileTypeCountsMap file_type_counts_;
-
/// Performs dynamic partition pruning, i.e., applies runtime filters to
files, and
/// issues initial ranges for all file types. Waits for runtime filters if
necessary.
/// Only valid to call if !initial_ranges_issued_. Sets
initial_ranges_issued_ to true.
@@ -525,6 +518,55 @@ class HdfsScanNodeBase : public ScanNode {
/// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based
on the
/// debug action specified for the query.
Status ScanNodeDebugAction(TExecNodePhase::type phase) WARN_UNUSED_RESULT;
+
+ private:
+ class HdfsCompressionTypesSet {
+ public:
+ HdfsCompressionTypesSet(): bit_map_(0) {
+ DCHECK_GE(sizeof(bit_map_) * CHAR_BIT,
_THdfsCompression_VALUES_TO_NAMES.size());
+ }
+
+ bool HasType(THdfsCompression::type type) {
+ return (bit_map_ & (1 << type)) != 0;
+ }
+
+ void AddType(const THdfsCompression::type type) {
+ bit_map_ |= (1 << type);
+ }
+
+ int Size() { return BitUtil::Popcount(bit_map_); }
+
+ THdfsCompression::type GetFirstType() {
+ DCHECK_GT(Size(), 0);
+ for (auto& elem : _THdfsCompression_VALUES_TO_NAMES) {
+ THdfsCompression::type type =
static_cast<THdfsCompression::type>(elem.first);
+ if (HasType(type)) return type;
+ }
+ return THdfsCompression::NONE;
+ }
+
+ // The following operator overloading is needed so this class can be part
of the
+ // std::map key.
+ bool operator< (const HdfsCompressionTypesSet& o) const {
+ return bit_map_ < o.bit_map_;
+ }
+
+ bool operator== (const HdfsCompressionTypesSet& o) const {
+ return bit_map_ == o.bit_map_;
+ }
+
+ private:
+ uint32_t bit_map_;
+ };
+
+ /// Mapping of file formats to the number of splits of that type. The key is
a tuple
+ /// containing:
+ /// * file type
+ /// * whether the split was skipped
+ /// * compression types set
+ typedef std::map<std::tuple<THdfsFileFormat::type, bool,
HdfsCompressionTypesSet>, int>
+ FileTypeCountsMap;
+ FileTypeCountsMap file_type_counts_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/testdata/multi_compression_parquet_data/README
----------------------------------------------------------------------
diff --git a/testdata/multi_compression_parquet_data/README
b/testdata/multi_compression_parquet_data/README
new file mode 100644
index 0000000..332f24d
--- /dev/null
+++ b/testdata/multi_compression_parquet_data/README
@@ -0,0 +1,8 @@
+These Parquet files were created by modifying Impala's HdfsParquetTableWriter.
+
+String Data
+-----------
+These files have two string columns 'a' and 'b'. Each columns using different
compression types.
+
+tinytable_0_gzip_snappy.parq: column 'a' is compressed by gzip, column 'b' is
compressed by snappy
+tinytable_1_gzip_snappy.parq: column 'a' is compressed by snappy, column 'b'
is compressed by gzip
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq
----------------------------------------------------------------------
diff --git
a/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq
b/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq
new file mode 100644
index 0000000..4fd468b
Binary files /dev/null and
b/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq differ
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq
----------------------------------------------------------------------
diff --git
a/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq
b/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq
new file mode 100644
index 0000000..e5d2220
Binary files /dev/null and
b/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq differ
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test
----------------------------------------------------------------------
diff --git
a/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test
b/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test
new file mode 100644
index 0000000..7e55ab1
--- /dev/null
+++
b/testdata/workloads/functional-query/queries/QueryTest/hdfs_parquet_scan_node_profile.test
@@ -0,0 +1,20 @@
+# Regression test for IMPALA-5448
+# This query will do a full scan on a parquet file
+select * from functional_parquet.alltypestiny where year=2009 and month=1
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: PARQUET/SNAPPY:1
+====
+---- QUERY
+# This query will do a full scan on a parquet table with two partitions.
+# Each partition uses different compression types.
+select * from alltypes_multi_compression
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: PARQUET/GZIP:1 PARQUET/SNAPPY:1
+====
+---- QUERY
+# This query will do a full scan on a parquet table with multiple
+# compression types
+select * from multi_compression
+---- RUNTIME_PROFILE
+row_regex: .*File Formats: PARQUET/\(GZIP,SNAPPY\):2
+====
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/192cd96d/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py
b/tests/query_test/test_scanners.py
index f4f2fd6..ca7cd6b 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -327,6 +327,39 @@ class TestParquet(ImpalaTestSuite):
assert len(result.data) == 1
assert "4294967294" in result.data
+ def test_multi_compression_types(self, vector, unique_database):
+ """IMPALA-5448: Tests that parquet splits with multi compression types are
counted
+ correctly. Cases tested:
+ - parquet file with columns using the same compression type
+ - parquet files using snappy and gzip compression types
+ """
+ self.client.execute("create table %s.alltypes_multi_compression like"
+ " functional_parquet.alltypes" % unique_database)
+ hql_format = "set parquet.compression={codec};" \
+ "insert into table %s.alltypes_multi_compression" \
+ " partition (year = {year}, month = {month})" \
+ " select id, bool_col, tinyint_col, smallint_col, int_col,
bigint_col," \
+ " float_col, double_col,date_string_col,string_col,timestamp_col" \
+ " from functional_parquet.alltypes" \
+ " where year = {year} and month = {month}" % unique_database
+ check_call(['hive', '-e', hql_format.format(codec="snappy", year=2010,
month=1)])
+ check_call(['hive', '-e', hql_format.format(codec="gzip", year=2010,
month=2)])
+
+ self.client.execute("create table %s.multi_compression (a string, b
string)"
+ " stored as parquet" % unique_database)
+ multi_compression_tbl_loc =\
+ get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
"multi_compression"))
+ check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+
"/testdata/multi_compression_parquet_data/tinytable_0_gzip_snappy.parq",
+ multi_compression_tbl_loc])
+ check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+
"/testdata/multi_compression_parquet_data/tinytable_1_snappy_gzip.parq",
+ multi_compression_tbl_loc])
+
+ vector.get_value('exec_option')['num_nodes'] = 1
+ self.run_test_case('QueryTest/hdfs_parquet_scan_node_profile',
+ vector, unique_database)
+
def test_corrupt_rle_counts(self, vector, unique_database):
"""IMPALA-3646: Tests that a certain type of file corruption for plain
dictionary encoded values is gracefully handled. Cases tested: