Repository: incubator-impala Updated Branches: refs/heads/master cdbcdca67 -> 858f5c219
IMPALA-4363: Add Parquet timestamp validation Before this patch, we would simply read the INT96 Parquet timestamp representation and assume that it's valid. However, not all bit permutations represent a valid timestamp. One of the boost functions raised an exception (that we didn't catch) when passed an invalid boost date object, which resulted in a crash. This patch fixes problem by validating that the date falls into 1400..9999 year range as we are scanning Parquet. Change-Id: Ieaab5d33e6f0df831d0e67e1d318e5416ffb90ac Reviewed-on: http://gerrit.cloudera.org:8080/5343 Reviewed-by: Taras Bobrovytsky <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/858f5c21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/858f5c21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/858f5c21 Branch: refs/heads/master Commit: 858f5c219710f1b72b25e509643f0cf9e1113dee Parents: cdbcdca Author: Taras Bobrovytsky <[email protected]> Authored: Fri Nov 4 17:12:04 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Sat Dec 3 06:41:07 2016 +0000 ---------------------------------------------------------------------- be/src/exec/parquet-column-readers.cc | 81 ++++++++++++++----- be/src/exec/parquet-column-readers.h | 14 +--- be/src/runtime/timestamp-value.h | 14 ++++ common/thrift/generate_error_codes.py | 4 + testdata/data/README | 10 +++ testdata/data/out_of_range_timestamp.parquet | Bin 0 -> 203 bytes .../queries/QueryTest/grant_revoke.test | 2 +- .../out-of-range-timestamp-abort-on-error.test | 8 ++ ...ut-of-range-timestamp-continue-on-error.test | 15 ++++ .../QueryTest/parquet-resolution-by-name.test | 3 +- .../queries/QueryTest/parquet.test | 2 +- tests/common/impala_test_suite.py | 7 +- tests/query_test/test_scanners.py | 17 ++++ 13 files changed, 141 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 2a83e9c..535d9be 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -254,7 +254,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { } protected: - template <bool IN_COLLECTION> + template<bool IN_COLLECTION> inline bool ReadValue(MemPool* pool, Tuple* tuple) { // NextLevels() should have already been called and def and rep levels should be in // valid range. @@ -268,9 +268,9 @@ class ScalarColumnReader : public BaseScalarColumnReader { if (MATERIALIZED) { if (def_level_ >= max_def_level()) { if (page_encoding_ == parquet::Encoding::PLAIN_DICTIONARY) { - if (!ReadSlot<true>(tuple->GetSlot(tuple_offset_), pool)) return false; + if (!ReadSlot<true>(tuple, pool)) return false; } else { - if (!ReadSlot<false>(tuple->GetSlot(tuple_offset_), pool)) return false; + if (!ReadSlot<false>(tuple, pool)) return false; } } else { tuple->SetNull(null_indicator_offset_); @@ -382,8 +382,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { if (MATERIALIZED) { if (def_level >= max_def_level()) { - bool continue_execution = - ReadSlot<IS_DICT_ENCODED>(tuple->GetSlot(tuple_offset_), pool); + bool continue_execution = ReadSlot<IS_DICT_ENCODED>(tuple, pool); if (UNLIKELY(!continue_execution)) return false; } else { tuple->SetNull(null_indicator_offset_); @@ -443,13 +442,15 @@ class ScalarColumnReader : public BaseScalarColumnReader { } private: - /// Writes the next value into *slot using pool if necessary. + /// Writes the next value into the appropriate destination slot in 'tuple' using pool + /// if necessary. /// /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns /// true. template<bool IS_DICT_ENCODED> - inline bool ReadSlot(void* slot, MemPool* pool) { + inline bool ReadSlot(Tuple* tuple, MemPool* pool) { + void* slot = tuple->GetSlot(tuple_offset_); T val; T* val_ptr = NeedsConversion() ? &val : reinterpret_cast<T*>(slot); if (IS_DICT_ENCODED) { @@ -468,8 +469,13 @@ class ScalarColumnReader : public BaseScalarColumnReader { } data_ += encoded_len; } + + if (UNLIKELY(NeedsValidation() && !ValidateSlot(val_ptr, tuple))) { + return false; + } if (UNLIKELY(NeedsConversion() && - !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) { + !tuple->IsNull(null_indicator_offset_) && + !ConvertSlot(&val, reinterpret_cast<T*>(slot), pool))) { return false; } return true; @@ -483,14 +489,28 @@ class ScalarColumnReader : public BaseScalarColumnReader { return false; } + /// Similar to NeedsCoversion(), most column readers do not require validation, + /// so to avoid branches, we return constant false. In general, types where not + /// all possible bit representations of the data type are valid should be + /// validated. + inline bool NeedsValidation() const { + return false; + } + /// Converts and writes src into dst based on desc_->type() bool ConvertSlot(const T* src, T* dst, MemPool* pool) { DCHECK(false); return false; } - /// Pull out slow-path Status construction code from ReadRepetitionLevel()/ - /// ReadDefinitionLevel() for performance. + /// Sets error message and returns false if the slot value is invalid, e.g., due to + /// being out of the valid value range. + bool ValidateSlot(T* src, Tuple* tuple) const { + DCHECK(false); + return false; + } + + /// Pull out slow-path Status construction code void __attribute__((noinline)) SetDictDecodeError() { parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE, filename(), slot_desc_->type().DebugString(), stream_->file_offset()); @@ -562,6 +582,27 @@ bool ScalarColumnReader<TimestampValue, true>::ConvertSlot( return true; } +template<> +inline bool ScalarColumnReader<TimestampValue, true>::NeedsValidation() const { + return true; +} + +template<> +bool ScalarColumnReader<TimestampValue, true>::ValidateSlot( + TimestampValue* src, Tuple* tuple) const { + if (UNLIKELY(!src->IsValidDate())) { + ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE, + filename(), node_.element->name); + Status status = parent_->state_->LogOrReturnError(msg); + if (!status.ok()) { + parent_->parse_status_ = status; + return false; + } + tuple->SetNull(null_indicator_offset_); + } + return true; +} + class BoolColumnReader : public BaseScalarColumnReader { public: BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node, @@ -614,7 +655,7 @@ class BoolColumnReader : public BaseScalarColumnReader { "Caller should have called NextLevels() until we are ready to read a value"; if (def_level_ >= max_def_level()) { - return ReadSlot<IN_COLLECTION>(tuple->GetSlot(tuple_offset_), pool); + return ReadSlot<IN_COLLECTION>(tuple, pool); } else { // Null value tuple->SetNull(null_indicator_offset_); @@ -622,14 +663,15 @@ class BoolColumnReader : public BaseScalarColumnReader { } } - /// Writes the next value into *slot using pool if necessary. Also advances def_level_ - /// and rep_level_ via NextLevels(). + /// Writes the next value into the next slot in the *tuple using pool if necessary. + /// Also advances def_level_ and rep_level_ via NextLevels(). /// /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns /// true. - template <bool IN_COLLECTION> - inline bool ReadSlot(void* slot, MemPool* pool) { + template<bool IN_COLLECTION> + inline bool ReadSlot(Tuple* tuple, MemPool* pool) { + void* slot = tuple->GetSlot(tuple_offset_); if (!bool_values_.GetValue(1, reinterpret_cast<bool*>(slot))) { parent_->parse_status_ = Status("Invalid bool column."); return false; @@ -954,7 +996,7 @@ Status BaseScalarColumnReader::ReadDataPage() { return Status::OK(); } -template <bool ADVANCE_REP_LEVEL> +template<bool ADVANCE_REP_LEVEL> bool BaseScalarColumnReader::NextLevels() { if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString(); @@ -1020,7 +1062,7 @@ bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) { if (tuple_offset_ == -1) { return CollectionColumnReader::NextLevels(); } else if (def_level_ >= max_def_level()) { - return ReadSlot(tuple->GetSlot(tuple_offset_), pool); + return ReadSlot(tuple, pool); } else { // Null value tuple->SetNull(null_indicator_offset_); @@ -1033,12 +1075,13 @@ bool CollectionColumnReader::ReadNonRepeatedValue( return CollectionColumnReader::ReadValue(pool, tuple); } -bool CollectionColumnReader::ReadSlot(void* slot, MemPool* pool) { +bool CollectionColumnReader::ReadSlot(Tuple* tuple, MemPool* pool) { DCHECK(!children_.empty()); DCHECK_LE(rep_level_, new_collection_rep_level()); // Recursively read the collection into a new CollectionValue. - CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>(slot); + CollectionValue* coll_slot = reinterpret_cast<CollectionValue*>( + tuple->GetSlot(tuple_offset_)); *coll_slot = CollectionValue(); CollectionValueBuilder builder( coll_slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 85acb88..5d68a24 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -440,15 +440,6 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// 'size' bytes remaining. virtual Status InitDataPage(uint8_t* data, int size) = 0; - private: - /// Writes the next value into *slot using pool if necessary. Also advances rep_level_ - /// and def_level_ via NextLevels(). - /// - /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is - /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns - /// true. - template <bool IN_COLLECTION> - inline bool ReadSlot(void* slot, MemPool* pool); }; /// Collections are not materialized directly in parquet files; only scalar values appear @@ -511,12 +502,13 @@ class CollectionColumnReader : public ParquetColumnReader { void UpdateDerivedState(); /// Recursively reads from children_ to assemble a single CollectionValue into - /// *slot. Also advances rep_level_ and def_level_ via NextLevels(). + /// the appropriate destination slot in 'tuple'. Also advances rep_level_ and + /// def_level_ via NextLevels(). /// /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns /// true. - inline bool ReadSlot(void* slot, MemPool* pool); + inline bool ReadSlot(Tuple* tuple, MemPool* pool); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/be/src/runtime/timestamp-value.h ---------------------------------------------------------------------- diff --git a/be/src/runtime/timestamp-value.h b/be/src/runtime/timestamp-value.h index e722375..5882928 100644 --- a/be/src/runtime/timestamp-value.h +++ b/be/src/runtime/timestamp-value.h @@ -20,6 +20,7 @@ #define IMPALA_RUNTIME_TIMESTAMP_VALUE_H #include <boost/date_time/compiler_config.hpp> +#include <boost/date_time/gregorian/gregorian.hpp> #include <boost/date_time/posix_time/conversion.hpp> #include <boost/date_time/posix_time/posix_time_types.hpp> #include <ctime> @@ -138,6 +139,19 @@ class TimestampValue { std::string DebugString() const; + /// Verifies that the timestamp date falls into a valid range (years 1400..9999). + inline bool IsValidDate() const { + // Smallest valid day number. + const static int64_t MIN_DAY_NUMBER = static_cast<int64_t>( + boost::gregorian::date(boost::date_time::min_date_time).day_number()); + // Largest valid day number. + const static int64_t MAX_DAY_NUMBER = static_cast<int64_t>( + boost::gregorian::date(boost::date_time::max_date_time).day_number()); + + return date_.day_number() >= MIN_DAY_NUMBER + && date_.day_number() <= MAX_DAY_NUMBER; + } + /// Formats the timestamp using the given date/time context and places the result in the /// string buffer. The size of the buffer should be at least dt_ctx.fmt_out_len + 1. A /// string terminator will be appended to the string. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/common/thrift/generate_error_codes.py ---------------------------------------------------------------------- diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index c96cfc8..b1ff4c8 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -306,6 +306,10 @@ error_codes = ( ("KUDU_NULL_CONSTRAINT_VIOLATION", 100, "Row with null value violates nullability constraint on table '$0'."), + + ("PARQUET_TIMESTAMP_OUT_OF_RANGE", 101, + "Parquet file '$0' column '$1' contains an out of range timestamp. " + "The valid date range is 1400-01-01..9999-12-31."), ) import sys http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/data/README ---------------------------------------------------------------------- diff --git a/testdata/data/README b/testdata/data/README index fce8014..3eb4ba0 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -78,3 +78,13 @@ Contains 1 column, uncompressed data size < 8M large_pbzip2.bz2 Generated with pbzip2, contains multiple bzip2 stream Contains 1 column, uncompressed data size > 8M + +out_of_range_timestamp.parquet: +----------- +Generated with a hacked version of Impala parquet writer. +Contains a single timestamp column with 4 values, 2 of which are out of range +and should be read as NULL by Impala: + 1399-12-31 00:00:00 (invalid - date too small) + 1400-01-01 00:00:00 + 9999-12-31 00:00:00 + 10000-01-01 00:00:00 (invalid - date too large) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/data/out_of_range_timestamp.parquet ---------------------------------------------------------------------- diff --git a/testdata/data/out_of_range_timestamp.parquet b/testdata/data/out_of_range_timestamp.parquet new file mode 100644 index 0000000..18d4e5c Binary files /dev/null and b/testdata/data/out_of_range_timestamp.parquet differ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test index fe340c2..3b12db4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test +++ b/testdata/workloads/functional-query/queries/QueryTest/grant_revoke.test @@ -204,7 +204,7 @@ STRING # on any URIs. The FE tests have additional error message verification. create table grant_rev_db.test_tbl2(i int) location '$FILESYSTEM_PREFIX/test-warehouse/grant_rev_test_tbl2'; ---- CATCH -does not have privileges to access: $NAMENODE/test-warehouse/grant_rev_test_tbl2 +does not have privileges to access: __HDFS_FILENAME__ ==== ---- QUERY grant role grant_revoke_test_ALL_URI to group $GROUP_NAME http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-abort-on-error.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-abort-on-error.test b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-abort-on-error.test new file mode 100644 index 0000000..087066a --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-abort-on-error.test @@ -0,0 +1,8 @@ +==== +---- QUERY +# IMPALA-4363: Verify that if an out of range parquet timestamp is read, +# the query is aborted with the right error message. +SELECT * FROM out_of_range_timestamp; +---- CATCH +Parquet file '__HDFS_FILENAME__' column 'ts' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31. +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-continue-on-error.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-continue-on-error.test b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-continue-on-error.test new file mode 100644 index 0000000..c571034 --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/out-of-range-timestamp-continue-on-error.test @@ -0,0 +1,15 @@ +==== +---- QUERY +# IMPALA-4363: Verify that if an out of range parquet timestamp is read, +# it's treated as a NULL and a warning is issued. +SELECT * FROM out_of_range_timestamp; +---- TYPES +TIMESTAMP +---- RESULTS +NULL +1400-01-01 00:00:00 +9999-12-31 00:00:00 +NULL +---- ERRORS +Parquet file '__HDFS_FILENAME__' column 'ts' contains an out of range timestamp. The valid date range is 1400-01-01..9999-12-31. (1 of 2 similar) +==== http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test index 2546e9c..d4b8a43 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-resolution-by-name.test @@ -220,8 +220,7 @@ string,int set parquet_fallback_schema_resolution="position"; select key, value from switched_map_fields_resolution_test.int_map ---- CATCH -File '$NAMENODE/test-warehouse/$DATABASE.db/switched_map_fields_resolution_test/ -switched_map.parq' has an incompatible Parquet schema for column +File '__HDFS_FILENAME__' has an incompatible Parquet schema for column '$DATABASE.switched_map_fields_resolution_test.int_map.key'. Column type: STRING, Parquet schema: required int32 value [i:0 d:1 r:1] http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/testdata/workloads/functional-query/queries/QueryTest/parquet.test ---------------------------------------------------------------------- diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test index a449162..3946b94 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test @@ -50,7 +50,7 @@ bigint,bigint,string,string,boolean,boolean,bigint,bigint,bigint,bigint # Parquet file with invalid magic number SELECT * from bad_magic_number ---- CATCH -File '$NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet' has an invalid version number: XXXX +File '__HDFS_FILENAME__' has an invalid version number: XXXX ==== ---- QUERY # count(*) query on parquet file with multiple blocks (one block per node) http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/tests/common/impala_test_suite.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index 38849db..3a45d0c 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -43,7 +43,10 @@ from tests.common.test_dimensions import ( create_exec_option_dimension, get_dataset_from_workload, load_table_info_dimension) -from tests.common.test_result_verifier import verify_raw_results, verify_runtime_profile +from tests.common.test_result_verifier import ( + apply_error_match_filter, + verify_raw_results, + verify_runtime_profile) from tests.common.test_vector import TestDimension from tests.performance.query import Query from tests.performance.query_exec_functions import execute_using_jdbc @@ -199,7 +202,7 @@ class ImpalaTestSuite(BaseTestSuite): Verifies that at least one of the strings in 'expected_str' is a substring of the actual exception string 'actual_str'. """ - actual_str = actual_str.replace('\n', '') + actual_str = ''.join(apply_error_match_filter([actual_str.replace('\n', '')])) for expected_str in expected_strs: # In error messages, some paths are always qualified and some are not. # So, allow both $NAMENODE and $FILESYSTEM_PREFIX to be used in CATCH. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/858f5c21/tests/query_test/test_scanners.py ---------------------------------------------------------------------- diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 0231261..cd1b94a 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -245,6 +245,23 @@ class TestParquet(ImpalaTestSuite): vector.get_value('exec_option')['abort_on_error'] = 1 self.run_test_case('QueryTest/parquet-abort-on-error', vector) + def test_timestamp_out_of_range(self, vector, unique_database): + """IMPALA-4363: Test scanning parquet files with an out of range timestamp.""" + self.client.execute(("create table {0}.out_of_range_timestamp (ts timestamp) " + "stored as parquet").format(unique_database)) + out_of_range_timestamp_loc = get_fs_path( + "/test-warehouse/{0}.db/{1}".format(unique_database, "out_of_range_timestamp")) + check_call(['hdfs', 'dfs', '-copyFromLocal', + os.environ['IMPALA_HOME'] + "/testdata/data/out_of_range_timestamp.parquet", + out_of_range_timestamp_loc]) + + vector.get_value('exec_option')['abort_on_error'] = 0 + self.run_test_case('QueryTest/out-of-range-timestamp-continue-on-error', + vector, unique_database) + vector.get_value('exec_option')['abort_on_error'] = 1 + self.run_test_case('QueryTest/out-of-range-timestamp-abort-on-error', + vector, unique_database) + def test_zero_rows(self, vector, unique_database): """IMPALA-3943: Tests that scanning files with num_rows=0 in the file footer succeeds without errors."""
