Repository: impala
Updated Branches:
  refs/heads/master 8b7f27af8 -> 73e90d237


IMPALA-6592: add test for invalid parquet codecs

IMPALA-6592 revealed a gap in test coverage for files with
invalid/unsupported Parquet codecs. This adds a test that reproduces the
bug that was present in my IMPALA-4835 patch. master is unaffected by
this bug.

I also hid the conversion tables and made the conversion go through
functions that validate the enum values, to make it easier to track down
problems like this in the future.

Testing:
Ran exhaustive tests.

Change-Id: I1502ea7b7f39aa09f0ed2677e84219b37c64c416
Reviewed-on: http://gerrit.cloudera.org:8080/9500
Reviewed-by: Tim Armstrong <tarmstr...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/73e90d23
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/73e90d23
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/73e90d23

Branch: refs/heads/master
Commit: 73e90d237e5d0c59a06480b769e2dddf6891b49f
Parents: 8b7f27a
Author: Tim Armstrong <tarmstr...@cloudera.com>
Authored: Mon Mar 5 14:12:15 2018 -0800
Committer: Impala Public Jenkins <impala-public-jenk...@gerrit.cloudera.org>
Committed: Thu Mar 8 04:48:36 2018 +0000

----------------------------------------------------------------------
 be/src/exec/CMakeLists.txt                      |   1 +
 be/src/exec/hdfs-parquet-table-writer.cc        |  10 +-
 be/src/exec/parquet-column-readers.h            |   4 +-
 be/src/exec/parquet-common.cc                   |  91 +++++++++++++++++++
 be/src/exec/parquet-common.h                    |  52 +++--------
 be/src/util/parquet-reader.cc                   |   2 +-
 testdata/data/README                            |   7 ++
 testdata/data/bad_codec.parquet                 | Bin 0 -> 3026 bytes
 .../queries/QueryTest/parquet-bad-codec.test    |   6 ++
 tests/query_test/test_scanners.py               |  13 +++
 10 files changed, 138 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index aab1383..ddd84ee 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -81,6 +81,7 @@ add_library(Exec
   kudu-scan-node-mt.cc
   kudu-table-sink.cc
   kudu-util.cc
+  parquet-common.cc
   read-write-util.cc
   scan-node.cc
   scanner-context.cc

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc 
b/be/src/exec/hdfs-parquet-table-writer.cc
index 2d40f14..6370859 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -182,8 +182,8 @@ class HdfsParquetTableWriter::BaseColumnWriter {
   uint64_t num_values() const { return num_values_; }
   uint64_t total_compressed_size() const { return total_compressed_byte_size_; 
}
   uint64_t total_uncompressed_size() const { return 
total_uncompressed_byte_size_; }
-  parquet::CompressionCodec::type codec() const {
-    return IMPALA_TO_PARQUET_CODEC[codec_];
+  parquet::CompressionCodec::type GetParquetCodec() const {
+    return ConvertImpalaToParquetCodec(codec_);
   }
 
  protected:
@@ -860,7 +860,7 @@ Status HdfsParquetTableWriter::CreateSchema() {
     parquet::SchemaElement& node = file_metadata_.schema[i + 1];
     const ColumnType& type = output_expr_evals_[i]->root().type();
     node.name = table_desc_->col_descs()[i + num_clustering_cols].name();
-    node.__set_type(INTERNAL_TO_PARQUET_TYPES[type.type]);
+    node.__set_type(ConvertInternalToParquetType(type.type));
     node.__set_repetition_type(FieldRepetitionType::OPTIONAL);
     if (type.type == TYPE_DECIMAL) {
       // This column is type decimal. Update the file metadata to include the
@@ -901,10 +901,10 @@ Status HdfsParquetTableWriter::AddRowGroup() {
   current_row_group_->columns.resize(columns_.size());
   for (int i = 0; i < columns_.size(); ++i) {
     ColumnMetaData metadata;
-    metadata.type = INTERNAL_TO_PARQUET_TYPES[columns_[i]->type().type];
+    metadata.type = ConvertInternalToParquetType(columns_[i]->type().type);
     metadata.path_in_schema.push_back(
         table_desc_->col_descs()[i + num_clustering_cols].name());
-    metadata.codec = columns_[i]->codec();
+    metadata.codec = columns_[i]->GetParquetCodec();
     current_row_group_->columns[i].__set_meta_data(metadata);
   }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 86ca239..aa90bb8 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -355,7 +355,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
 
     if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
       RETURN_IF_ERROR(Codec::CreateDecompressor(
-          NULL, false, PARQUET_TO_IMPALA_CODEC[metadata_->codec], 
&decompressor_));
+          NULL, false, ConvertParquetToImpalaCodec(metadata_->codec), 
&decompressor_));
     }
     ClearDictionaryDecoder();
     return Status::OK();
@@ -376,7 +376,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   int col_idx() const { return node_.col_idx; }
   THdfsCompression::type codec() const {
     if (metadata_ == NULL) return THdfsCompression::NONE;
-    return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
+    return ConvertParquetToImpalaCodec(metadata_->codec);
   }
 
   /// Reads the next definition and repetition levels for this column. 
Initializes the

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-common.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.cc b/be/src/exec/parquet-common.cc
new file mode 100644
index 0000000..651e7fd
--- /dev/null
+++ b/be/src/exec/parquet-common.cc
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet-common.h"
+
+namespace impala {
+
+/// Mapping of impala's internal types to parquet storage types. This is 
indexed by
+/// PrimitiveType enum
+const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
+  parquet::Type::BOOLEAN,     // Invalid
+  parquet::Type::BOOLEAN,     // NULL type
+  parquet::Type::BOOLEAN,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT64,
+  parquet::Type::FLOAT,
+  parquet::Type::DOUBLE,
+  parquet::Type::INT96,       // Timestamp
+  parquet::Type::BYTE_ARRAY,  // String
+  parquet::Type::BYTE_ARRAY,  // Date, NYI
+  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
+  parquet::Type::BYTE_ARRAY,  // Binary NYI
+  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
+  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
+  parquet::Type::BYTE_ARRAY,  // CHAR(N)
+};
+
+const int INTERNAL_TO_PARQUET_TYPES_SIZE =
+  sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]);
+
+/// Mapping of Parquet codec enums to Impala enums
+const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
+  THdfsCompression::NONE,
+  THdfsCompression::SNAPPY,
+  THdfsCompression::GZIP,
+  THdfsCompression::LZO
+};
+
+const int PARQUET_TO_IMPALA_CODEC_SIZE =
+    sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
+
+/// Mapping of Impala codec enums to Parquet enums
+const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
+  parquet::CompressionCodec::UNCOMPRESSED,
+  parquet::CompressionCodec::SNAPPY,  // DEFAULT
+  parquet::CompressionCodec::GZIP,    // GZIP
+  parquet::CompressionCodec::GZIP,    // DEFLATE
+  parquet::CompressionCodec::SNAPPY,
+  parquet::CompressionCodec::SNAPPY,  // SNAPPY_BLOCKED
+  parquet::CompressionCodec::LZO,
+};
+
+const int IMPALA_TO_PARQUET_CODEC_SIZE =
+    sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
+
+parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) {
+  DCHECK_GE(type, 0);
+  DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE);
+  return INTERNAL_TO_PARQUET_TYPES[type];
+}
+
+THdfsCompression::type ConvertParquetToImpalaCodec(
+    parquet::CompressionCodec::type codec) {
+  DCHECK_GE(codec, 0);
+  DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE);
+  return PARQUET_TO_IMPALA_CODEC[codec];
+}
+
+parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
+    THdfsCompression::type codec) {
+  DCHECK_GE(codec, 0);
+  DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE);
+  return IMPALA_TO_PARQUET_CODEC[codec];
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
index 91712e4..a81064e 100644
--- a/be/src/exec/parquet-common.h
+++ b/be/src/exec/parquet-common.h
@@ -35,46 +35,18 @@ class TimestampValue;
 const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
 const uint32_t PARQUET_CURRENT_VERSION = 1;
 
-/// Mapping of impala's internal types to parquet storage types. This is 
indexed by
-/// PrimitiveType enum
-const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
-  parquet::Type::BOOLEAN,     // Invalid
-  parquet::Type::BOOLEAN,     // NULL type
-  parquet::Type::BOOLEAN,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT64,
-  parquet::Type::FLOAT,
-  parquet::Type::DOUBLE,
-  parquet::Type::INT96,       // Timestamp
-  parquet::Type::BYTE_ARRAY,  // String
-  parquet::Type::BYTE_ARRAY,  // Date, NYI
-  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
-  parquet::Type::BYTE_ARRAY,  // Binary NYI
-  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
-  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
-  parquet::Type::BYTE_ARRAY,  // CHAR(N)
-};
-
-/// Mapping of Parquet codec enums to Impala enums
-const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
-  THdfsCompression::NONE,
-  THdfsCompression::SNAPPY,
-  THdfsCompression::GZIP,
-  THdfsCompression::LZO
-};
-
-/// Mapping of Impala codec enums to Parquet enums
-const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
-  parquet::CompressionCodec::UNCOMPRESSED,
-  parquet::CompressionCodec::SNAPPY,  // DEFAULT
-  parquet::CompressionCodec::GZIP,    // GZIP
-  parquet::CompressionCodec::GZIP,    // DEFLATE
-  parquet::CompressionCodec::SNAPPY,
-  parquet::CompressionCodec::SNAPPY,  // SNAPPY_BLOCKED
-  parquet::CompressionCodec::LZO,
-};
+/// Return the Parquet type corresponding to Impala's internal type. The 
caller must
+/// validate that the type is valid, otherwise this will DCHECK.
+parquet::Type::type ConvertInternalToParquetType(PrimitiveType type);
+
+/// Return the Impala compression type for the given Parquet codec. The caller 
must
+/// validate that the codec is a supported one, otherwise this will DCHECK.
+THdfsCompression::type 
ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
+
+/// Return the Parquet code for the given Impala compression type. The caller 
must
+/// validate that the codec is a supported one, otherwise this will DCHECK.
+parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
+    THdfsCompression::type codec);
 
 /// The plain encoding does not maintain any state so all these functions
 /// are static helpers.

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/be/src/util/parquet-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/util/parquet-reader.cc b/be/src/util/parquet-reader.cc
index 3c83c23..a50838c 100644
--- a/be/src/util/parquet-reader.cc
+++ b/be/src/util/parquet-reader.cc
@@ -137,7 +137,7 @@ int CheckDataPage(const ColumnChunk& col, const PageHeader& 
header, const uint8_
 
     boost::scoped_ptr<impala::Codec> decompressor;
     ABORT_IF_ERROR(impala::Codec::CreateDecompressor(NULL, false,
-        impala::PARQUET_TO_IMPALA_CODEC[col.meta_data.codec], &decompressor));
+        impala::ConvertParquetToImpalaCodec(col.meta_data.codec), 
&decompressor));
 
     uint8_t* buffer_ptr = decompressed_buffer.data();
     int uncompressed_page_size = header.uncompressed_page_size;

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/data/README
----------------------------------------------------------------------
diff --git a/testdata/data/README b/testdata/data/README
index e293c3c..f02e70b 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -155,3 +155,10 @@ Created to test the read path for a Parquet file with 
invalid metadata, namely w
 'max_value' and 'min_value' are both NaN. Contains 2 single-column rows:
 NaN
 42
+
+bad_codec.parquet:
+Generated by Impala's Parquet writer, hacked to use the invalid enum value 
5000 for the
+compression codec. The data in the file is the whole of the "alltypestiny" 
data set, with
+the same columns: id int, bool_col boolean, tinyint_col tinyint, smallint_col 
smallint,
+int_col int, bigint_col bigint, float_col float, double_col double,
+date_string_col string, string_col string, timestamp_col timestamp, year int, 
month int

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/data/bad_codec.parquet
----------------------------------------------------------------------
diff --git a/testdata/data/bad_codec.parquet b/testdata/data/bad_codec.parquet
new file mode 100644
index 0000000..cef2228
Binary files /dev/null and b/testdata/data/bad_codec.parquet differ

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test
----------------------------------------------------------------------
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test
new file mode 100644
index 0000000..a52c683
--- /dev/null
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-bad-codec.test
@@ -0,0 +1,6 @@
+====
+---- QUERY
+select * from bad_codec
+---- CATCH
+uses an unsupported compression: 5000 for column
+====

http://git-wip-us.apache.org/repos/asf/impala/blob/73e90d23/tests/query_test/test_scanners.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 97e72a9..a67b793 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -417,6 +417,19 @@ class TestParquet(ImpalaTestSuite):
 
     self.run_test_case('QueryTest/parquet-def-levels', vector, unique_database)
 
+  def test_bad_compression_codec(self, vector, unique_database):
+    """IMPALA-6593: test the bad compression codec is handled gracefully. """
+    self.client.execute(("""CREATE TABLE {0}.bad_codec (
+          id INT, bool_col BOOLEAN, tinyint_col TINYINT, smallint_col SMALLINT,
+          int_col INT, bigint_col BIGINT, float_col FLOAT, double_col DOUBLE,
+          date_string_col STRING, string_col STRING, timestamp_col TIMESTAMP,
+          year INT, month INT) STORED AS PARQUET""").format(unique_database))
+    tbl_loc = get_fs_path("/test-warehouse/%s.db/%s" % (unique_database,
+        "bad_codec"))
+    check_call(['hdfs', 'dfs', '-copyFromLocal', os.environ['IMPALA_HOME'] +
+        "/testdata/data/bad_codec.parquet", tbl_loc])
+    self.run_test_case('QueryTest/parquet-bad-codec', vector, unique_database)
+
   @SkipIfS3.hdfs_block_size
   @SkipIfADLS.hdfs_block_size
   @SkipIfIsilon.hdfs_block_size

Reply via email to