Repository: impala Updated Branches: refs/heads/master 8b7f27af8 -> 73e90d237
IMPALA-6592: add test for invalid parquet codecs IMPALA-6592 revealed a gap in test coverage for files with invalid/unsupported Parquet codecs. This adds a test that reproduces the bug that was present in my IMPALA-4835 patch. master is unaffected by this bug. I also hid the conversion tables and made the conversion go through functions that validate the enum values, to make it easier to track down problems like this in the future. Testing: Ran exhaustive tests. Change-Id: I1502ea7b7f39aa09f0ed2677e84219b37c64c416 Reviewed-on: http://gerrit.cloudera.org:8080/9500 Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/impala/repo Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/73e90d23 Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/73e90d23 Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/73e90d23 Branch: refs/heads/master Commit: 73e90d237e5d0c59a06480b769e2dddf6891b49f Parents: 8b7f27a Author: Tim Armstrong <tarmstr...@cloudera.com> Authored: Mon Mar 5 14:12:15 2018 -0800 Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org> Committed: Thu Mar 8 04:48:36 2018 +0000 ---------------------------------------------------------------------- be/src/exec/CMakeLists.txt | 1 + be/src/exec/hdfs-parquet-table-writer.cc | 10 +- be/src/exec/parquet-column-readers.h | 4 +- be/src/exec/parquet-common.cc | 91 +++++++++++++++++++ be/src/exec/parquet-common.h | 52 +++-------- be/src/util/parquet-reader.cc | 2 +- testdata/data/README | 7 ++ testdata/data/bad_codec.parquet | Bin 0 -> 3026 bytes .../queries/QueryTest/parquet-bad-codec.test | 6 ++ tests/query_test/test_scanners.py | 13 +++ 10 files changed, 138 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index aab1383..ddd84ee 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -81,6 +81,7 @@ add_library(Exec kudu-scan-node-mt.cc kudu-table-sink.cc kudu-util.cc + parquet-common.cc read-write-util.cc scan-node.cc scanner-context.cc http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/hdfs-parquet-table-writer.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index 2d40f14..6370859 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -182,8 +182,8 @@ class HdfsParquetTableWriter::BaseColumnWriter { uint64_t num_values() const { return num_values_; } uint64_t total_compressed_size() const { return total_compressed_byte_size_; } uint64_t total_uncompressed_size() const { return total_uncompressed_byte_size_; } - parquet::CompressionCodec::type codec() const { - return IMPALA_TO_PARQUET_CODEC[codec_]; + parquet::CompressionCodec::type GetParquetCodec() const { + return ConvertImpalaToParquetCodec(codec_); } protected: @@ -860,7 +860,7 @@ Status HdfsParquetTableWriter::CreateSchema() { parquet::SchemaElement& node = file_metadata_.schema[i + 1]; const ColumnType& type = output_expr_evals_[i]->root().type(); node.name = table_desc_->col_descs()[i + num_clustering_cols].name(); - node.__set_type(INTERNAL_TO_PARQUET_TYPES[type.type]); + node.__set_type(ConvertInternalToParquetType(type.type)); node.__set_repetition_type(FieldRepetitionType::OPTIONAL); if (type.type == TYPE_DECIMAL) { // This column is type decimal. Update the file metadata to include the @@ -901,10 +901,10 @@ Status HdfsParquetTableWriter::AddRowGroup() { current_row_group_->columns.resize(columns_.size()); for (int i = 0; i < columns_.size(); ++i) { ColumnMetaData metadata; - metadata.type = INTERNAL_TO_PARQUET_TYPES[columns_[i]->type().type]; + metadata.type = ConvertInternalToParquetType(columns_[i]->type().type); metadata.path_in_schema.push_back( table_desc_->col_descs()[i + num_clustering_cols].name()); - metadata.codec = columns_[i]->codec(); + metadata.codec = columns_[i]->GetParquetCodec(); current_row_group_->columns[i].__set_meta_data(metadata); } http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 86ca239..aa90bb8 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -355,7 +355,7 @@ class BaseScalarColumnReader : public ParquetColumnReader { if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) { RETURN_IF_ERROR(Codec::CreateDecompressor( - NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], &decompressor_)); + NULL, false, ConvertParquetToImpalaCodec(metadata_->codec), &decompressor_)); } ClearDictionaryDecoder(); return Status::OK(); @@ -376,7 +376,7 @@ class BaseScalarColumnReader : public ParquetColumnReader { int col_idx() const { return node_.col_idx; } THdfsCompression::type codec() const { if (metadata_ == NULL) return THdfsCompression::NONE; - return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; + return ConvertParquetToImpalaCodec(metadata_->codec); } /// Reads the next definition and repetition levels for this column. Initializes the http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-common.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-common.cc b/be/src/exec/parquet-common.cc new file mode 100644 index 0000000..651e7fd --- /dev/null +++ b/be/src/exec/parquet-common.cc @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet-common.h" + +namespace impala { + +/// Mapping of impala's internal types to parquet storage types. This is indexed by +/// PrimitiveType enum +const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = { + parquet::Type::BOOLEAN, // Invalid + parquet::Type::BOOLEAN, // NULL type + parquet::Type::BOOLEAN, + parquet::Type::INT32, + parquet::Type::INT32, + parquet::Type::INT32, + parquet::Type::INT64, + parquet::Type::FLOAT, + parquet::Type::DOUBLE, + parquet::Type::INT96, // Timestamp + parquet::Type::BYTE_ARRAY, // String + parquet::Type::BYTE_ARRAY, // Date, NYI + parquet::Type::BYTE_ARRAY, // DateTime, NYI + parquet::Type::BYTE_ARRAY, // Binary NYI + parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal + parquet::Type::BYTE_ARRAY, // VARCHAR(N) + parquet::Type::BYTE_ARRAY, // CHAR(N) +}; + +const int INTERNAL_TO_PARQUET_TYPES_SIZE = + sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]); + +/// Mapping of Parquet codec enums to Impala enums +const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = { + THdfsCompression::NONE, + THdfsCompression::SNAPPY, + THdfsCompression::GZIP, + THdfsCompression::LZO +}; + +const int PARQUET_TO_IMPALA_CODEC_SIZE = + sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]); + +/// Mapping of Impala codec enums to Parquet enums +const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = { + parquet::CompressionCodec::UNCOMPRESSED, + parquet::CompressionCodec::SNAPPY, // DEFAULT + parquet::CompressionCodec::GZIP, // GZIP + parquet::CompressionCodec::GZIP, // DEFLATE + parquet::CompressionCodec::SNAPPY, + parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED + parquet::CompressionCodec::LZO, +}; + +const int IMPALA_TO_PARQUET_CODEC_SIZE = + sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]); + +parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) { + DCHECK_GE(type, 0); + DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE); + return INTERNAL_TO_PARQUET_TYPES[type]; +} + +THdfsCompression::type ConvertParquetToImpalaCodec( + parquet::CompressionCodec::type codec) { + DCHECK_GE(codec, 0); + DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE); + return PARQUET_TO_IMPALA_CODEC[codec]; +} + +parquet::CompressionCodec::type ConvertImpalaToParquetCodec( + THdfsCompression::type codec) { + DCHECK_GE(codec, 0); + DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE); + return IMPALA_TO_PARQUET_CODEC[codec]; +} +} http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-common.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h index 91712e4..a81064e 100644 --- a/be/src/exec/parquet-common.h +++ b/be/src/exec/parquet-common.h @@ -35,46 +35,18 @@ class TimestampValue; const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; const uint32_t PARQUET_CURRENT_VERSION = 1; -/// Mapping of impala's internal types to parquet storage types. This is indexed by -/// PrimitiveType enum -const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = { - parquet::Type::BOOLEAN, // Invalid - parquet::Type::BOOLEAN, // NULL type - parquet::Type::BOOLEAN, - parquet::Type::INT32, - parquet::Type::INT32, - parquet::Type::INT32, - parquet::Type::INT64, - parquet::Type::FLOAT, - parquet::Type::DOUBLE, - parquet::Type::INT96, // Timestamp - parquet::Type::BYTE_ARRAY, // String - parquet::Type::BYTE_ARRAY, // Date, NYI - parquet::Type::BYTE_ARRAY, // DateTime, NYI - parquet::Type::BYTE_ARRAY, // Binary NYI - parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal - parquet::Type::BYTE_ARRAY, // VARCHAR(N) - parquet::Type::BYTE_ARRAY, // CHAR(N) -}; - -/// Mapping of Parquet codec enums to Impala enums -const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = { - THdfsCompression::NONE, - THdfsCompression::SNAPPY, - THdfsCompression::GZIP, - THdfsCompression::LZO -}; - -/// Mapping of Impala codec enums to Parquet enums -const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = { - parquet::CompressionCodec::UNCOMPRESSED, - parquet::CompressionCodec::SNAPPY, // DEFAULT - parquet::CompressionCodec::GZIP, // GZIP - parquet::CompressionCodec::GZIP, // DEFLATE - parquet::CompressionCodec::SNAPPY, - parquet::CompressionCodec::SNAPPY, // SNAPPY_BLOCKED - parquet::CompressionCodec::LZO, -}; +/// Return the Parquet type corresponding to Impala's internal type. The caller must +/// validate that the type is valid, otherwise this will DCHECK. +parquet::Type::type ConvertInternalToParquetType(PrimitiveType type); + +/// Return the Impala compression type for the given Parquet codec. The caller must +/// validate that the codec is a supported one, otherwise this will DCHECK. +THdfsCompression::type ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec); + +/// Return the Parquet code for the given Impala compression type. The caller must +/// validate that the codec is a supported one, otherwise this will DCHECK. +parquet::CompressionCodec::type ConvertImpalaToParquetCodec( + THdfsCompression::type codec); /// The plain encoding does not maintain any state so all these functions /// are static helpers. http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/util/parquet-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc index 3c83c23..a50838c 100644 --- a/be/src/util/parquet-reader.cc +++ b/be/src/util/parquet-reader.cc @@ -137,7 +137,7 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& header, const uint8_ boost::scoped_ptr<impala::Codec> decompressor; ABORT_IF_ERROR(impala::Codec::CreateDecompressor(NULL, false, - impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor)); + impala::ConvertParquetToImpalaCodec(col.meta_data.codec), &decompressor)); uint8_t* buffer_ptr = decompressed_buffer.data(); int uncompressed_page_size = header.uncompressed_page_size; http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/data/README ---------------------------------------------------------------------- diff --git a/testdata/data/README b/testdata/data/README index e293c3c..f02e70b 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -155,3 +155,10 @@ Created to test the read path for a Parquet file with invalid metadata, namely w 'max_value' and 'min_value' are both NaN. Contains 2 single-column rows: NaN 42 + +bad_codec.parquet: +Generated by Impala's Parquet writer, hacked to use the invalid enum value 5000 for the +compression codec. The data in the file is the whole of the "alltypestiny" data set, with +the same columns: id int, bool_col boolean, tinyint_col tinyint, smallint_col smallint, +int_col int, bigint_col bigint, float_col float, double_col double, +date_string_col string, string_col string, timestamp_col timestamp, year int, month int http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/data/bad_codec.parquet ---------------------------------------------------------------------- diff --git a/testdata/data/bad_codec.parquet b/testdata/data/bad_codec.parquet new file mode 100644 index 0000000..cef2228 Binary files /dev/null and b/testdata/data/bad_codec.parquet differ http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test new file mode 100644 index 0000000..a52c683 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test @@ -0,0 +1,6 @@ +==== +---- QUERY +select * from bad_codec +---- CATCH +uses an unsupported compression: 5000 for column +==== http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 97e72a9..a67b793 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -417,6 +417,19 @@ class TestParquet(ImpalaTestSuite): self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database) + def test_bad_compression_codec(self, vector, unique_database): + """IMPALA-6593: test the bad compression codec is handled gracefully. """ + self.client.execute(("""CREATE TABLE {0}.bad_codec ( + id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT, + int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE, + date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP, + year INT, month INT) STORED AS PARQUET""").format(unique_database)) + tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database, + "bad_codec")) + check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] + + "/testdata/data/bad_codec.parquet", tbl_loc]) + self.run_test_case('QueryTest/parquet-bad-codec', vector, unique_database) + @SkipIfS3.hdfs_block_size @SkipIfADLS.hdfs_block_size @SkipIfIsilon.hdfs_block_size